You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:45 UTC

[09/23] storm git commit: Update callings to supervisor-isupervisor-dir

Update callings to supervisor-isupervisor-dir


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3d2796d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3d2796d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3d2796d2

Branch: refs/heads/master
Commit: 3d2796d2fbd7ae182d7414db985cf7de167fcd82
Parents: f350f1a
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Jan 14 15:25:47 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600

----------------------------------------------------------------------
 .../org/apache/storm/command/healthcheck.clj    |  2 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 56 ++++++++++----------
 .../clj/org/apache/storm/daemon/supervisor.clj  | 16 +++---
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  4 ++
 4 files changed, 41 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
index f64be92..138c7d8 100644
--- a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
+++ b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
@@ -62,7 +62,7 @@
       (finally (.interrupt interrupter-thread)))))
 
 (defn health-check [conf]
-  (let [health-dir (absolute-healthcheck-dir conf)
+  (let [health-dir (ConfigUtils/absoluteHealthCheckDir conf)
         health-files (file-seq (io/file health-dir))
         health-scripts (filter #(and (.canExecute %)
                                      (not (.isDirectory %)))

http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 86e93b4..9f4423a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -206,7 +206,7 @@
      }))
 
 (defn inbox [nimbus]
-  (master-inbox (:conf nimbus)))
+  (ConfigUtils/masterInbox (:conf nimbus)))
 
 (defn- get-subject
   []
@@ -216,7 +216,7 @@
 (defn- read-storm-conf [conf storm-id blob-store]
   (clojurify-structure
     (Utils/fromCompressedJsonConf
-      (.readBlob blob-store (master-stormconf-key storm-id) (get-subject)))))
+      (.readBlob blob-store (ConfigUtils/masterStormConfKey storm-id) (get-subject)))))
 
 (declare delay-event)
 (declare mk-assignments)
@@ -234,10 +234,10 @@
 
 (defn- get-key-list-from-id
   [conf id]
-  (log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)})
+  (log-debug "set keys id = " id "set = " #{(ConfigUtils/masterStormCodeKey id) (ConfigUtils/masterStormJarKey id) (ConfigUtils/masterStormConfKey id)})
   (if (ConfigUtils/isLocalMode conf)
-    [(master-stormcode-key id) (master-stormconf-key id)]
-    [(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)]))
+    [(ConfigUtils/masterStormCodeKey id) (ConfigUtils/masterStormConfKey id)]
+    [(ConfigUtils/masterStormCodeKey id) (ConfigUtils/masterStormJarKey id) (ConfigUtils/masterStormConfKey id)]))
 
 (defn kill-transition [nimbus storm-id]
   (fn [kill-time]
@@ -460,9 +460,9 @@
   (let [subject (get-subject)
         storm-cluster-state (:storm-cluster-state nimbus)
         blob-store (:blob-store nimbus)
-        jar-key (master-stormjar-key storm-id)
-        code-key (master-stormcode-key storm-id)
-        conf-key (master-stormconf-key storm-id)
+        jar-key (ConfigUtils/masterStormJarKey storm-id)
+        code-key (ConfigUtils/masterStormCodeKey storm-id)
+        conf-key (ConfigUtils/masterStormConfKey storm-id)
         nimbus-host-port-info (:nimbus-host-port-info nimbus)]
     (when tmp-jar-location ;;in local mode there is no jar
       (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
@@ -477,7 +477,7 @@
 
 (defn- read-storm-topology [storm-id blob-store]
   (Utils/deserialize
-    (.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) StormTopology))
+    (.readBlob blob-store (ConfigUtils/masterStormCodeKey storm-id) (get-subject)) StormTopology))
 
 (defn get-blob-replication-count
   [blob-key nimbus]
@@ -489,10 +489,10 @@
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
         max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
         current-replication-count-jar (if (not (ConfigUtils/isLocalMode conf))
-                                        (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus))
+                                        (atom (get-blob-replication-count (ConfigUtils/masterStormJarKey storm-id) nimbus))
                                         (atom min-replication-count))
-        current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
-        current-replication-count-conf (atom (get-blob-replication-count (master-stormconf-key storm-id) nimbus))
+        current-replication-count-code (atom (get-blob-replication-count (ConfigUtils/masterStormCodeKey storm-id) nimbus))
+        current-replication-count-conf (atom (get-blob-replication-count (ConfigUtils/masterStormConfKey storm-id) nimbus))
         total-wait-time (atom 0)]
     (if (:blob-store nimbus)
       (while (and
@@ -510,9 +510,9 @@
           " total-wait-time " @total-wait-time)
         (swap! total-wait-time inc)
         (if (not (ConfigUtils/isLocalMode conf))
-          (reset! current-replication-count-conf  (get-blob-replication-count (master-stormconf-key storm-id) nimbus)))
-        (reset! current-replication-count-code  (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
-        (reset! current-replication-count-jar  (get-blob-replication-count (master-stormjar-key storm-id) nimbus))))
+          (reset! current-replication-count-conf  (get-blob-replication-count (ConfigUtils/masterStormConfKey storm-id) nimbus)))
+        (reset! current-replication-count-code  (get-blob-replication-count (ConfigUtils/masterStormCodeKey storm-id) nimbus))
+        (reset! current-replication-count-jar  (get-blob-replication-count (ConfigUtils/masterStormJarKey storm-id) nimbus))))
     (if (and (< min-replication-count @current-replication-count-conf)
              (< min-replication-count @current-replication-count-code)
              (< min-replication-count @current-replication-count-jar))
@@ -526,14 +526,14 @@
 
 (defn- read-storm-topology-as-nimbus [storm-id blob-store]
   (Utils/deserialize
-    (.readBlob blob-store (master-stormcode-key storm-id) nimbus-subject) StormTopology))
+    (.readBlob blob-store (ConfigUtils/masterStormCodeKey storm-id) nimbus-subject) StormTopology))
 
 (declare compute-executor->component)
 
 (defn read-storm-conf-as-nimbus [storm-id blob-store]
   (clojurify-structure
     (Utils/fromCompressedJsonConf
-      (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
+      (.readBlob blob-store (ConfigUtils/masterStormConfKey storm-id) nimbus-subject))))
 
 (defn read-topology-details [nimbus storm-id]
   (let [blob-store (:blob-store nimbus)
@@ -1116,9 +1116,9 @@
       (log-message "Exception" e))))
 
 (defn blob-rm-topology-keys [id blob-store storm-cluster-state]
-  (blob-rm-key blob-store (master-stormjar-key id) storm-cluster-state)
-  (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
-  (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
+  (blob-rm-key blob-store (ConfigUtils/masterStormJarKey id) storm-cluster-state)
+  (blob-rm-key blob-store (ConfigUtils/masterStormConfKey id) storm-cluster-state)
+  (blob-rm-key blob-store (ConfigUtils/masterStormCodeKey id) storm-cluster-state))
 
 (defn do-cleanup [nimbus]
   (if (is-leader nimbus :throw-exception false)
@@ -1133,7 +1133,7 @@
             (log-message "Cleaning up " id)
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
-            (rmr (master-stormdist-root conf id))
+            (rmr (ConfigUtils/masterStormDistRoot conf id))
             (blob-rm-topology-keys id blob-store storm-cluster-state)
             (swap! (:heartbeats-cache nimbus) dissoc id)))))
     (log-message "not a leader, skipping cleanup")))
@@ -1218,7 +1218,7 @@
 (defn check-file-access [conf file-path]
   (log-debug "check file access:" file-path)
   (try
-    (if (not= (.getCanonicalFile (File. (master-stormdist-root conf)))
+    (if (not= (.getCanonicalFile (File. (ConfigUtils/masterStormDistRoot conf)))
           (-> (File. file-path) .getCanonicalFile .getParentFile .getParentFile))
       (throw (AuthorizationException. (str "Invalid file path: " file-path))))
     (catch Exception e
@@ -1351,7 +1351,7 @@
   nil)
 
 (defserverfn service-handler [conf inimbus]
-  (.prepare inimbus conf (master-inimbus-dir conf))
+  (.prepare inimbus conf (ConfigUtils/masterInimbusDir conf))
   (log-message "Starting Nimbus with conf " conf)
   (let [nimbus (nimbus-data conf inimbus)
         blob-store (:blob-store nimbus)
@@ -1835,7 +1835,7 @@
                                        (.set_assigned_memonheap topo-summ (get resources 3))
                                        (.set_assigned_memoffheap topo-summ (get resources 4))
                                        (.set_assigned_cpu topo-summ (get resources 5)))
-                                     (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+                                     (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
                                      topo-summ))
               ret (ClusterSummary. supervisor-summaries
                                    topology-summaries
@@ -1902,7 +1902,7 @@
               (.set_assigned_cpu topo-info (get resources 5)))
             (when-let [component->debug (:component->debug base)]
               (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
-            (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+            (.set_replication_count topo-info (get-blob-replication-count (ConfigUtils/masterStormCodeKey storm-id) nimbus))
           topo-info))
 
       (^TopologyInfo getTopologyInfo [this ^String topology-id]
@@ -2096,7 +2096,7 @@
             (.set_uptime_secs (time-delta (:launch-time-secs info)))
             (.set_topology_conf (to-json (try-read-storm-conf conf
                                                               topo-id (:blob-store nimbus))))
-            (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
+            (.set_replication_count (get-blob-replication-count (ConfigUtils/masterStormCodeKey topo-id) nimbus)))
           (when-let [debug-options
                      (get-in info [:base :component->debug topo-id])]
             (.set_debug_options
@@ -2215,7 +2215,7 @@
                (throw
                 (IllegalArgumentException.
                  (str tmp-jar-location " to copy to " stormroot " does not exist!"))))
-             (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
+             (FileUtils/copyFile src-file (File. (ConfigUtils/masterStormJarPath stormroot)))
              ))
 
 ;; local implementation
@@ -2227,7 +2227,7 @@
 (defn -launch [nimbus]
   (let [conf (merge
                (clojurify-structure (ConfigUtils/readStormConfig))
-               (read-yaml-config "storm-cluster-auth.yaml" false))]
+               (clojurify-structure (ConfigUtils/readYamlConfig "storm-cluster-auth.yaml" false)))]
   (launch-server! conf nimbus)))
 
 (defn standalone-nimbus []

http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 2c492e3..838a98f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -338,7 +338,7 @@
                                            (log-error t "Error when processing event")
                                            (exit-process! 20 "Error when processing a event"))
                                 :timer-name "blob-update-timer")
-   :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
+   :localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf))
    :assignment-versions (atom {})
    :sync-retry (atom 0)
    :download-lock (Object.)
@@ -763,7 +763,7 @@
 ;; another thread launches events to restart any dead processes if necessary
 (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
   (log-message "Starting Supervisor with conf " conf)
-  (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
+  (.prepare isupervisor conf (ConfigUtils/supervisorIsupervisorDir conf))
   (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
   (let [supervisor (supervisor-data conf shared-context isupervisor)
         [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
@@ -878,7 +878,7 @@
 (defn download-blobs-for-topology!
   "Download all blobs listed in the topology configuration for a given topology."
   [conf stormconf-path localizer tmproot]
-  (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+  (let [storm-conf (ConfigUtils/readSupervisorStormConfGivenPath conf stormconf-path)
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         user (storm-conf TOPOLOGY-SUBMITTER-USER)
         topo-name (storm-conf TOPOLOGY-NAME)
@@ -930,11 +930,11 @@
       (Utils/restrictPermissions tmproot)
       (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
         (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
-    (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id)
+    (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id)
       (supervisor-stormjar-path tmproot) blobstore)
-    (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id)
+    (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id)
       (supervisor-stormcode-path tmproot) blobstore)
-    (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id)
+    (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id)
       (supervisor-stormconf-path tmproot) blobstore)
     (.shutdown blobstore)
     (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
@@ -1143,8 +1143,8 @@
         blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
     (try
       (FileUtils/forceMkdir (File. tmproot))
-      (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
-      (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+      (.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+      (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
       (finally
         (.shutdown blob-store)))
     (FileUtils/moveDirectory (File. tmproot) (File. stormroot))

http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 4b8df65..3dd6d4e 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -232,6 +232,7 @@ public class ConfigUtils {
         return stormRoot + FILE_SEPARATOR + "tmp";
     }
 
+    /* Never get used TODO : delete it*/
     public static String masterTmpDir(Map conf) throws IOException {
         String ret = stormTmpPath(masterLocalDir(conf));
         FileUtils.forceMkdir(new File(ret));
@@ -244,6 +245,7 @@ public class ConfigUtils {
         return ret;
     }
 
+    /* Never get used TODO : may delete it*/
     public static String masterStormMetaFilePath(String stormRoot) {
         return (stormRoot + FILE_SEPARATOR + "storm-code-distributor.meta");
     }
@@ -252,10 +254,12 @@ public class ConfigUtils {
         return (stormRoot + FILE_SEPARATOR + "stormjar.jar");
     }
 
+    /* Never get used TODO : may delete it*/
     public static String masterStormCodePath(String stormRoot) {
         return (stormRoot + FILE_SEPARATOR + "stormcode.ser");
     }
 
+    /* Never get used TODO : may delete it*/
     public static String masterStormConfPath(String stormRoot) {
         return (stormRoot + FILE_SEPARATOR + "stormconf.ser");
     }