You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:26 UTC

[41/50] [abbrv] storm git commit: Changed the code-distributor entries to ephemeral nodes. Added code to ensure nimbus sets up the correct code-distributor entries on startup.

Changed the code-distributor entries to ephemeral nodes. Added code to ensure nimbus sets up the correct code-distributor entries on startup.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3f66ffd7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3f66ffd7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3f66ffd7

Branch: refs/heads/master
Commit: 3f66ffd73086681c43a7f16342cab5cd944f25f4
Parents: 3e20823
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Apr 8 16:52:12 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Aug 12 09:28:27 2015 -0700

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj       |  5 ++++-
 storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 11 +++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3f66ffd7/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index f75648a..15af1b0 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -483,8 +483,11 @@
 
       (setup-code-distributor!
         [this storm-id nimbusInfo]
+        (let [path (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo))]
         (mkdirs cluster-state (code-distributor-path storm-id) acls)
-        (mkdirs cluster-state (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo)) acls))
+        ;we delete the node first to ensure the node gets created as part of this session only.
+        (delete-node cluster-state path)
+        (set-ephemeral-node cluster-state path nil acls)))
 
       (remove-storm!
         [this storm-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/3f66ffd7/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 35154d3..5d9a038 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -957,6 +957,15 @@
         )))
   (log-message "not a leader, skipping cleanup-corrupt-topologies"))
 
+;;setsup code distributor entries for all current topologies for which code is available locally.
+(defn setup-code-distributor [nimbus]
+  (let [storm-cluster-state (:storm-cluster-state nimbus)
+        locally-available-storm-ids (set (code-ids (:conf nimbus)))
+        active-topologies (set (.active-storms storm-cluster-state))
+        locally-available-active-storm-ids (set/intersection locally-available-storm-ids active-topologies)]
+    (doseq [storm-id locally-available-active-storm-ids]
+      (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))))
+
 (defn- get-errors [storm-cluster-state storm-id component-id]
   (->> (.errors storm-cluster-state storm-id component-id)
        (map #(doto (ErrorInfo. (:error %) (:time-secs %))
@@ -1074,6 +1083,8 @@
 
     (.addToLeaderLockQueue (:leader-elector nimbus))
     (cleanup-corrupt-topologies! nimbus)
+    (setup-code-distributor nimbus)
+
     ;register call back for code-distributor
     (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
     (when (is-leader nimbus :throw-exception false)