You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:02 UTC

[17/50] [abbrv] storm git commit: STORM-166: renaming code distributor.

STORM-166: renaming code distributor.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1e8782f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1e8782f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1e8782f

Branch: refs/heads/master
Commit: c1e8782f7cfb3524b6c0d51fe48669fd09b87527
Parents: 58667be
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Jan 9 13:52:33 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Jan 9 13:52:33 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 24 ++++++++++----------
 .../clj/backtype/storm/daemon/supervisor.clj    | 16 ++++++-------
 .../test/clj/backtype/storm/nimbus_test.clj     |  2 +-
 .../storm/security/auth/nimbus_auth_test.clj    |  1 -
 .../test/clj/backtype/storm/supervisor_test.clj |  2 +-
 5 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 1f93be6..d1f5c31 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -62,7 +62,7 @@
     scheduler
     ))
 
-(defmulti mk-bt-tracker cluster-mode)
+(defmulti mk-code-distributor cluster-mode)
 (defmulti sync-code cluster-mode)
 
 (defnk is-leader [nimbus :throw-exception true]
@@ -100,7 +100,7 @@
                                  ))
      :scheduler (mk-scheduler conf inimbus)
      :leader-elector (zk-leader-elector conf)
-     :bt-tracker (mk-bt-tracker conf)
+     :bt-tracker (mk-code-distributor conf)
      :id->sched-status (atom {})
      :cred-renewers (AuthUtils/GetCredentialRenewers conf)
      :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
@@ -336,15 +336,15 @@
    (setup-jar conf tmp-jar-location stormroot)
    (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
    (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
-   (if (:bt-tracker nimbus) (.upload (:bt-tracker nimbus) stormroot storm-id))
+   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
    ))
 
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf NIMBUS-MIN-REPLICATION-COUNT)
         max-replication-wait-time (conf NIMBUS-MAX-REPLICATION-WAIT-TIME-SEC)
         total-wait-time (atom 0)
-        current-replication-count (atom (if (:bt-tracker nimbus) (.getReplicationCount (:bt-tracker nimbus) storm-id) 0))]
-  (if (:bt-tracker nimbus)
+        current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
+  (if (:code-distributor nimbus)
     (while (and (> min-replication-count @current-replication-count)
              (or (= -1 max-replication-wait-time)
                (< @total-wait-time max-replication-wait-time)))
@@ -353,7 +353,7 @@
           min-replication-count = " min-replication-count  " max-replication-wait-time = " max-replication-wait-time
           "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time)
         (swap! total-wait-time inc)
-        (reset! current-replication-count  (.getReplicationCount (:bt-tracker nimbus) storm-id))))
+        (reset! current-replication-count  (.getReplicationCount (:code-distributor nimbus) storm-id))))
   (if (< min-replication-count @current-replication-count)
     (log-message "desired replication count "  min-replication-count " achieved,
       current-replication-count" @current-replication-count)
@@ -897,7 +897,7 @@
         (when-not (empty? to-cleanup-ids)
           (doseq [id to-cleanup-ids]
             (log-message "Cleaning up " id)
-            (if (:bt-tracker nimbus) (.cleanup (:bt-tracker nimbus) id))
+            (if (:code-distributor nimbus) (.cleanup (:code-distributor nimbus) id))
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
             (rmr (master-stormdist-root conf id))
@@ -1363,19 +1363,19 @@
         (.cleanup (:downloaders nimbus))
         (.cleanup (:uploaders nimbus))
         (.close (:leader-elector nimbus))
-        (if (:bt-tracker nimbus) (.close (:bt-tracker nimbus) (:conf nimbus)))
+        (if (:code-distributor nimbus) (.close (:code-distributor nimbus) (:conf nimbus)))
         (log-message "Shut down master")
         )
       DaemonCommon
       (waiting? [this]
         (timer-waiting? (:timer nimbus))))))
 
-(defmethod mk-bt-tracker :distributed [conf]
+(defmethod mk-code-distributor :distributed [conf]
   (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
     (.prepare code-distributor conf)
     code-distributor))
 
-(defmethod mk-bt-tracker :local [conf]
+(defmethod mk-code-distributor :local [conf]
   nil)
 
 (defn download-code [conf nimbus storm-id host port]
@@ -1386,8 +1386,8 @@
         local-meta-file-path (master-storm-metafile-path tmp-root)]
     (FileUtils/forceMkdir (File. tmp-root))
     (Utils/downloadFromHost conf remote-meta-file-path local-meta-file-path host port)
-    (if (:bt-tracker nimbus)
-      (.download (:bt-tracker nimbus) storm-id (File. local-meta-file-path)))
+    (if (:code-distributor nimbus)
+      (.download (:code-distributor nimbus) storm-id (File. local-meta-file-path)))
     (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root)))
     (FileUtils/moveDirectory (File. tmp-root) (File. storm-root))
     (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))))

http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 3fcf7eb..ceb098e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -32,7 +32,7 @@
 
 (defmulti download-storm-code cluster-mode)
 (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
-(defmulti mk-bt-tracker cluster-mode)
+(defmulti mk-code-distributor cluster-mode)
 
 ;; used as part of a map from port to this
 (defrecord LocalAssignment [storm-id executors])
@@ -302,7 +302,7 @@
                                          ))
    :assignment-versions (atom {})
    :sync-retry (atom 0)
-   :bt-tracker (mk-bt-tracker conf)
+   :bt-tracker (mk-code-distributor conf)
    })
 
 (defn sync-processes [supervisor]
@@ -342,8 +342,8 @@
          ". State: " state
          ", Heartbeat: " (pr-str heartbeat))
         (shutdown-worker supervisor id)
-        (if (:bt-tracker supervisor)
-          (.cleanup (:bt-tracker supervisor) id))
+        (if (:code-distributor supervisor)
+          (.cleanup (:code-distributor supervisor) id))
         ))
 
     (doseq [id (vals new-worker-ids)]
@@ -552,8 +552,8 @@
           supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
       (FileUtils/forceMkdir (File. tmproot))
       (Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path)
-      (if (:bt-tracker supervisor)
-        (.download (:bt-tracker supervisor) storm-id (File. supervisor-meta-file-path)))
+      (if (:code-distributor supervisor)
+        (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path)))
       (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
       (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot)))
       (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
@@ -587,7 +587,7 @@
                                              (storm-conf TOPOLOGY-USERS)))))}]
     (write-log-metadata-to-yaml-file! storm-id port data conf)))
 
-(defmethod mk-bt-tracker :distributed [conf]
+(defmethod mk-code-distributor :distributed [conf]
   (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
     (.prepare code-distributor conf)
     code-distributor))
@@ -719,7 +719,7 @@
               )
             )))
 
-(defmethod mk-bt-tracker :local [conf] nil)
+(defmethod mk-code-distributor :local [conf] nil)
 
 (defmethod launch-worker
     :local [supervisor storm-id port worker-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 1a94049..c937aaa 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -1245,7 +1245,7 @@
                  uptime-computer nil
                  new-instance nil
                  mk-timer nil
-                 nimbus/mk-bt-tracker nil
+                 nimbus/mk-code-distributor nil
                  zk-leader-elector nil
                  nimbus/mk-scheduler nil]
         (nimbus/nimbus-data auth-conf fake-inimbus)

http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
index a776693..2787461 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
@@ -48,7 +48,6 @@
         nimbus-server (ThriftServer. (:daemon-conf cluster-map)
                                      (Nimbus$Processor. (:nimbus cluster-map)) 
                                      ThriftConnectionType/NIMBUS)]
-    (Thread/sleep 2000)
     (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop nimbus-server))))
     (.start (Thread. #(.serve nimbus-server)))
     (wait-for-condition #(.isServing nimbus-server))

http://git-wip-us.apache.org/repos/asf/storm/blob/c1e8782f/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 6b4328a..9328769 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -481,7 +481,7 @@
                  cluster/mk-storm-cluster-state nil
                  supervisor-state nil
                  local-hostname nil
-                 supervisor/mk-bt-tracker nil
+                 supervisor/mk-code-distributor nil
                  mk-timer nil]
         (supervisor/supervisor-data auth-conf nil fake-isupervisor)
         (verify-call-times-for cluster/mk-storm-cluster-state 1)