You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:26 UTC
[41/50] [abbrv] storm git commit: Changed the code-distributor
entries to ephemeral nodes. Added code to ensure nimbus sets up the correct
code-distributor entries on startup.
Changed the code-distributor entries to ephemeral nodes. Added code to ensure nimbus sets up the correct code-distributor entries on startup.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3f66ffd7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3f66ffd7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3f66ffd7
Branch: refs/heads/master
Commit: 3f66ffd73086681c43a7f16342cab5cd944f25f4
Parents: 3e20823
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Apr 8 16:52:12 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Aug 12 09:28:27 2015 -0700
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/cluster.clj | 5 ++++-
storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 11 +++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3f66ffd7/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index f75648a..15af1b0 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -483,8 +483,11 @@
(setup-code-distributor!
[this storm-id nimbusInfo]
+ (let [path (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo))]
(mkdirs cluster-state (code-distributor-path storm-id) acls)
- (mkdirs cluster-state (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo)) acls))
+ ;we delete the node first to ensure the node gets created as part of this session only.
+ (delete-node cluster-state path)
+ (set-ephemeral-node cluster-state path nil acls)))
(remove-storm!
[this storm-id]
http://git-wip-us.apache.org/repos/asf/storm/blob/3f66ffd7/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 35154d3..5d9a038 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -957,6 +957,15 @@
)))
(log-message "not a leader, skipping cleanup-corrupt-topologies"))
+;;setsup code distributor entries for all current topologies for which code is available locally.
+(defn setup-code-distributor [nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ locally-available-storm-ids (set (code-ids (:conf nimbus)))
+ active-topologies (set (.active-storms storm-cluster-state))
+ locally-available-active-storm-ids (set/intersection locally-available-storm-ids active-topologies)]
+ (doseq [storm-id locally-available-active-storm-ids]
+ (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))))
+
(defn- get-errors [storm-cluster-state storm-id component-id]
(->> (.errors storm-cluster-state storm-id component-id)
(map #(doto (ErrorInfo. (:error %) (:time-secs %))
@@ -1074,6 +1083,8 @@
(.addToLeaderLockQueue (:leader-elector nimbus))
(cleanup-corrupt-topologies! nimbus)
+ (setup-code-distributor nimbus)
+
;register call back for code-distributor
(.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
(when (is-leader nimbus :throw-exception false)