You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:02 UTC
[17/50] [abbrv] storm git commit: STORM-166: renaming code
distributor.
STORM-166: renaming code distributor.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1e8782f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1e8782f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1e8782f
Branch: refs/heads/master
Commit: c1e8782f7cfb3524b6c0d51fe48669fd09b87527
Parents: 58667be
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Jan 9 13:52:33 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Jan 9 13:52:33 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/nimbus.clj | 24 ++++++++++----------
.../clj/backtype/storm/daemon/supervisor.clj | 16 ++++++-------
.../test/clj/backtype/storm/nimbus_test.clj | 2 +-
.../storm/security/auth/nimbus_auth_test.clj | 1 -
.../test/clj/backtype/storm/supervisor_test.clj | 2 +-
5 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 1f93be6..d1f5c31 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -62,7 +62,7 @@
scheduler
))
-(defmulti mk-bt-tracker cluster-mode)
+(defmulti mk-code-distributor cluster-mode)
(defmulti sync-code cluster-mode)
(defnk is-leader [nimbus :throw-exception true]
@@ -100,7 +100,7 @@
))
:scheduler (mk-scheduler conf inimbus)
:leader-elector (zk-leader-elector conf)
- :bt-tracker (mk-bt-tracker conf)
+ :bt-tracker (mk-code-distributor conf)
:id->sched-status (atom {})
:cred-renewers (AuthUtils/GetCredentialRenewers conf)
:nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
@@ -336,15 +336,15 @@
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
- (if (:bt-tracker nimbus) (.upload (:bt-tracker nimbus) stormroot storm-id))
+ (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
))
(defn- wait-for-desired-code-replication [nimbus conf storm-id]
(let [min-replication-count (conf NIMBUS-MIN-REPLICATION-COUNT)
max-replication-wait-time (conf NIMBUS-MAX-REPLICATION-WAIT-TIME-SEC)
total-wait-time (atom 0)
- current-replication-count (atom (if (:bt-tracker nimbus) (.getReplicationCount (:bt-tracker nimbus) storm-id) 0))]
- (if (:bt-tracker nimbus)
+ current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
+ (if (:code-distributor nimbus)
(while (and (> min-replication-count @current-replication-count)
(or (= -1 max-replication-wait-time)
(< @total-wait-time max-replication-wait-time)))
@@ -353,7 +353,7 @@
min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time
"current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time)
(swap! total-wait-time inc)
- (reset! current-replication-count (.getReplicationCount (:bt-tracker nimbus) storm-id))))
+ (reset! current-replication-count (.getReplicationCount (:code-distributor nimbus) storm-id))))
(if (< min-replication-count @current-replication-count)
(log-message "desired replication count " min-replication-count " achieved,
current-replication-count" @current-replication-count)
@@ -897,7 +897,7 @@
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
- (if (:bt-tracker nimbus) (.cleanup (:bt-tracker nimbus) id))
+ (if (:code-distributor nimbus) (.cleanup (:code-distributor nimbus) id))
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
@@ -1363,19 +1363,19 @@
(.cleanup (:downloaders nimbus))
(.cleanup (:uploaders nimbus))
(.close (:leader-elector nimbus))
- (if (:bt-tracker nimbus) (.close (:bt-tracker nimbus) (:conf nimbus)))
+ (if (:code-distributor nimbus) (.close (:code-distributor nimbus) (:conf nimbus)))
(log-message "Shut down master")
)
DaemonCommon
(waiting? [this]
(timer-waiting? (:timer nimbus))))))
-(defmethod mk-bt-tracker :distributed [conf]
+(defmethod mk-code-distributor :distributed [conf]
(let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
(.prepare code-distributor conf)
code-distributor))
-(defmethod mk-bt-tracker :local [conf]
+(defmethod mk-code-distributor :local [conf]
nil)
(defn download-code [conf nimbus storm-id host port]
@@ -1386,8 +1386,8 @@
local-meta-file-path (master-storm-metafile-path tmp-root)]
(FileUtils/forceMkdir (File. tmp-root))
(Utils/downloadFromHost conf remote-meta-file-path local-meta-file-path host port)
- (if (:bt-tracker nimbus)
- (.download (:bt-tracker nimbus) storm-id (File. local-meta-file-path)))
+ (if (:code-distributor nimbus)
+ (.download (:code-distributor nimbus) storm-id (File. local-meta-file-path)))
(if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root)))
(FileUtils/moveDirectory (File. tmp-root) (File. storm-root))
(.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))))
http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 3fcf7eb..ceb098e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -32,7 +32,7 @@
(defmulti download-storm-code cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
-(defmulti mk-bt-tracker cluster-mode)
+(defmulti mk-code-distributor cluster-mode)
;; used as part of a map from port to this
(defrecord LocalAssignment [storm-id executors])
@@ -302,7 +302,7 @@
))
:assignment-versions (atom {})
:sync-retry (atom 0)
- :bt-tracker (mk-bt-tracker conf)
+ :bt-tracker (mk-code-distributor conf)
})
(defn sync-processes [supervisor]
@@ -342,8 +342,8 @@
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)
- (if (:bt-tracker supervisor)
- (.cleanup (:bt-tracker supervisor) id))
+ (if (:code-distributor supervisor)
+ (.cleanup (:code-distributor supervisor) id))
))
(doseq [id (vals new-worker-ids)]
@@ -552,8 +552,8 @@
supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
(FileUtils/forceMkdir (File. tmproot))
(Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path)
- (if (:bt-tracker supervisor)
- (.download (:bt-tracker supervisor) storm-id (File. supervisor-meta-file-path)))
+ (if (:code-distributor supervisor)
+ (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path)))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
(if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot)))
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
@@ -587,7 +587,7 @@
(storm-conf TOPOLOGY-USERS)))))}]
(write-log-metadata-to-yaml-file! storm-id port data conf)))
-(defmethod mk-bt-tracker :distributed [conf]
+(defmethod mk-code-distributor :distributed [conf]
(let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
(.prepare code-distributor conf)
code-distributor))
@@ -719,7 +719,7 @@
)
)))
-(defmethod mk-bt-tracker :local [conf] nil)
+(defmethod mk-code-distributor :local [conf] nil)
(defmethod launch-worker
:local [supervisor storm-id port worker-id]
http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 1a94049..c937aaa 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -1245,7 +1245,7 @@
uptime-computer nil
new-instance nil
mk-timer nil
- nimbus/mk-bt-tracker nil
+ nimbus/mk-code-distributor nil
zk-leader-elector nil
nimbus/mk-scheduler nil]
(nimbus/nimbus-data auth-conf fake-inimbus)
http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
index a776693..2787461 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
@@ -48,7 +48,6 @@
nimbus-server (ThriftServer. (:daemon-conf cluster-map)
(Nimbus$Processor. (:nimbus cluster-map))
ThriftConnectionType/NIMBUS)]
- (Thread/sleep 2000)
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop nimbus-server))))
(.start (Thread. #(.serve nimbus-server)))
(wait-for-condition #(.isServing nimbus-server))
http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 6b4328a..9328769 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -481,7 +481,7 @@
cluster/mk-storm-cluster-state nil
supervisor-state nil
local-hostname nil
- supervisor/mk-bt-tracker nil
+ supervisor/mk-code-distributor nil
mk-timer nil]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)