You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:45 UTC
[09/23] storm git commit: Update callings to
supervisor-isupervisor-dir
Update callings to supervisor-isupervisor-dir
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3d2796d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3d2796d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3d2796d2
Branch: refs/heads/master
Commit: 3d2796d2fbd7ae182d7414db985cf7de167fcd82
Parents: f350f1a
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Jan 14 15:25:47 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600
----------------------------------------------------------------------
.../org/apache/storm/command/healthcheck.clj | 2 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 56 ++++++++++----------
.../clj/org/apache/storm/daemon/supervisor.clj | 16 +++---
.../jvm/org/apache/storm/utils/ConfigUtils.java | 4 ++
4 files changed, 41 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
index f64be92..138c7d8 100644
--- a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
+++ b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
@@ -62,7 +62,7 @@
(finally (.interrupt interrupter-thread)))))
(defn health-check [conf]
- (let [health-dir (absolute-healthcheck-dir conf)
+ (let [health-dir (ConfigUtils/absoluteHealthCheckDir conf)
health-files (file-seq (io/file health-dir))
health-scripts (filter #(and (.canExecute %)
(not (.isDirectory %)))
http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 86e93b4..9f4423a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -206,7 +206,7 @@
}))
(defn inbox [nimbus]
- (master-inbox (:conf nimbus)))
+ (ConfigUtils/masterInbox (:conf nimbus)))
(defn- get-subject
[]
@@ -216,7 +216,7 @@
(defn- read-storm-conf [conf storm-id blob-store]
(clojurify-structure
(Utils/fromCompressedJsonConf
- (.readBlob blob-store (master-stormconf-key storm-id) (get-subject)))))
+ (.readBlob blob-store (ConfigUtils/masterStormConfKey storm-id) (get-subject)))))
(declare delay-event)
(declare mk-assignments)
@@ -234,10 +234,10 @@
(defn- get-key-list-from-id
[conf id]
- (log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)})
+ (log-debug "set keys id = " id "set = " #{(ConfigUtils/masterStormCodeKey id) (ConfigUtils/masterStormJarKey id) (ConfigUtils/masterStormConfKey id)})
(if (ConfigUtils/isLocalMode conf)
- [(master-stormcode-key id) (master-stormconf-key id)]
- [(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)]))
+ [(ConfigUtils/masterStormCodeKey id) (ConfigUtils/masterStormConfKey id)]
+ [(ConfigUtils/masterStormCodeKey id) (ConfigUtils/masterStormJarKey id) (ConfigUtils/masterStormConfKey id)]))
(defn kill-transition [nimbus storm-id]
(fn [kill-time]
@@ -460,9 +460,9 @@
(let [subject (get-subject)
storm-cluster-state (:storm-cluster-state nimbus)
blob-store (:blob-store nimbus)
- jar-key (master-stormjar-key storm-id)
- code-key (master-stormcode-key storm-id)
- conf-key (master-stormconf-key storm-id)
+ jar-key (ConfigUtils/masterStormJarKey storm-id)
+ code-key (ConfigUtils/masterStormCodeKey storm-id)
+ conf-key (ConfigUtils/masterStormConfKey storm-id)
nimbus-host-port-info (:nimbus-host-port-info nimbus)]
(when tmp-jar-location ;;in local mode there is no jar
(.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
@@ -477,7 +477,7 @@
(defn- read-storm-topology [storm-id blob-store]
(Utils/deserialize
- (.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) StormTopology))
+ (.readBlob blob-store (ConfigUtils/masterStormCodeKey storm-id) (get-subject)) StormTopology))
(defn get-blob-replication-count
[blob-key nimbus]
@@ -489,10 +489,10 @@
(let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
current-replication-count-jar (if (not (ConfigUtils/isLocalMode conf))
- (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus))
+ (atom (get-blob-replication-count (ConfigUtils/masterStormJarKey storm-id) nimbus))
(atom min-replication-count))
- current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
- current-replication-count-conf (atom (get-blob-replication-count (master-stormconf-key storm-id) nimbus))
+ current-replication-count-code (atom (get-blob-replication-count (ConfigUtils/masterStormCodeKey storm-id) nimbus))
+ current-replication-count-conf (atom (get-blob-replication-count (ConfigUtils/masterStormConfKey storm-id) nimbus))
total-wait-time (atom 0)]
(if (:blob-store nimbus)
(while (and
@@ -510,9 +510,9 @@
" total-wait-time " @total-wait-time)
(swap! total-wait-time inc)
(if (not (ConfigUtils/isLocalMode conf))
- (reset! current-replication-count-conf (get-blob-replication-count (master-stormconf-key storm-id) nimbus)))
- (reset! current-replication-count-code (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
- (reset! current-replication-count-jar (get-blob-replication-count (master-stormjar-key storm-id) nimbus))))
+ (reset! current-replication-count-conf (get-blob-replication-count (ConfigUtils/masterStormConfKey storm-id) nimbus)))
+ (reset! current-replication-count-code (get-blob-replication-count (ConfigUtils/masterStormCodeKey storm-id) nimbus))
+ (reset! current-replication-count-jar (get-blob-replication-count (ConfigUtils/masterStormJarKey storm-id) nimbus))))
(if (and (< min-replication-count @current-replication-count-conf)
(< min-replication-count @current-replication-count-code)
(< min-replication-count @current-replication-count-jar))
@@ -526,14 +526,14 @@
(defn- read-storm-topology-as-nimbus [storm-id blob-store]
(Utils/deserialize
- (.readBlob blob-store (master-stormcode-key storm-id) nimbus-subject) StormTopology))
+ (.readBlob blob-store (ConfigUtils/masterStormCodeKey storm-id) nimbus-subject) StormTopology))
(declare compute-executor->component)
(defn read-storm-conf-as-nimbus [storm-id blob-store]
(clojurify-structure
(Utils/fromCompressedJsonConf
- (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
+ (.readBlob blob-store (ConfigUtils/masterStormConfKey storm-id) nimbus-subject))))
(defn read-topology-details [nimbus storm-id]
(let [blob-store (:blob-store nimbus)
@@ -1116,9 +1116,9 @@
(log-message "Exception" e))))
(defn blob-rm-topology-keys [id blob-store storm-cluster-state]
- (blob-rm-key blob-store (master-stormjar-key id) storm-cluster-state)
- (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
- (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
+ (blob-rm-key blob-store (ConfigUtils/masterStormJarKey id) storm-cluster-state)
+ (blob-rm-key blob-store (ConfigUtils/masterStormConfKey id) storm-cluster-state)
+ (blob-rm-key blob-store (ConfigUtils/masterStormCodeKey id) storm-cluster-state))
(defn do-cleanup [nimbus]
(if (is-leader nimbus :throw-exception false)
@@ -1133,7 +1133,7 @@
(log-message "Cleaning up " id)
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
- (rmr (master-stormdist-root conf id))
+ (rmr (ConfigUtils/masterStormDistRoot conf id))
(blob-rm-topology-keys id blob-store storm-cluster-state)
(swap! (:heartbeats-cache nimbus) dissoc id)))))
(log-message "not a leader, skipping cleanup")))
@@ -1218,7 +1218,7 @@
(defn check-file-access [conf file-path]
(log-debug "check file access:" file-path)
(try
- (if (not= (.getCanonicalFile (File. (master-stormdist-root conf)))
+ (if (not= (.getCanonicalFile (File. (ConfigUtils/masterStormDistRoot conf)))
(-> (File. file-path) .getCanonicalFile .getParentFile .getParentFile))
(throw (AuthorizationException. (str "Invalid file path: " file-path))))
(catch Exception e
@@ -1351,7 +1351,7 @@
nil)
(defserverfn service-handler [conf inimbus]
- (.prepare inimbus conf (master-inimbus-dir conf))
+ (.prepare inimbus conf (ConfigUtils/masterInimbusDir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)
blob-store (:blob-store nimbus)
@@ -1835,7 +1835,7 @@
(.set_assigned_memonheap topo-summ (get resources 3))
(.set_assigned_memoffheap topo-summ (get resources 4))
(.set_assigned_cpu topo-summ (get resources 5)))
- (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+ (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
topo-summ))
ret (ClusterSummary. supervisor-summaries
topology-summaries
@@ -1902,7 +1902,7 @@
(.set_assigned_cpu topo-info (get resources 5)))
(when-let [component->debug (:component->debug base)]
(.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
- (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+ (.set_replication_count topo-info (get-blob-replication-count (ConfigUtils/masterStormCodeKey storm-id) nimbus))
topo-info))
(^TopologyInfo getTopologyInfo [this ^String topology-id]
@@ -2096,7 +2096,7 @@
(.set_uptime_secs (time-delta (:launch-time-secs info)))
(.set_topology_conf (to-json (try-read-storm-conf conf
topo-id (:blob-store nimbus))))
- (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
+ (.set_replication_count (get-blob-replication-count (ConfigUtils/masterStormCodeKey topo-id) nimbus)))
(when-let [debug-options
(get-in info [:base :component->debug topo-id])]
(.set_debug_options
@@ -2215,7 +2215,7 @@
(throw
(IllegalArgumentException.
(str tmp-jar-location " to copy to " stormroot " does not exist!"))))
- (FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
+ (FileUtils/copyFile src-file (File. (ConfigUtils/masterStormJarPath stormroot)))
))
;; local implementation
@@ -2227,7 +2227,7 @@
(defn -launch [nimbus]
(let [conf (merge
(clojurify-structure (ConfigUtils/readStormConfig))
- (read-yaml-config "storm-cluster-auth.yaml" false))]
+ (clojurify-structure (ConfigUtils/readYamlConfig "storm-cluster-auth.yaml" false)))]
(launch-server! conf nimbus)))
(defn standalone-nimbus []
http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 2c492e3..838a98f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -338,7 +338,7 @@
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing a event"))
:timer-name "blob-update-timer")
- :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
+ :localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf))
:assignment-versions (atom {})
:sync-retry (atom 0)
:download-lock (Object.)
@@ -763,7 +763,7 @@
;; another thread launches events to restart any dead processes if necessary
(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
(log-message "Starting Supervisor with conf " conf)
- (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
+ (.prepare isupervisor conf (ConfigUtils/supervisorIsupervisorDir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
@@ -878,7 +878,7 @@
(defn download-blobs-for-topology!
"Download all blobs listed in the topology configuration for a given topology."
[conf stormconf-path localizer tmproot]
- (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+ (let [storm-conf (ConfigUtils/readSupervisorStormConfGivenPath conf stormconf-path)
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
user (storm-conf TOPOLOGY-SUBMITTER-USER)
topo-name (storm-conf TOPOLOGY-NAME)
@@ -930,11 +930,11 @@
(Utils/restrictPermissions tmproot)
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
- (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id)
+ (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id)
(supervisor-stormjar-path tmproot) blobstore)
- (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id)
+ (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id)
(supervisor-stormcode-path tmproot) blobstore)
- (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id)
+ (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id)
(supervisor-stormconf-path tmproot) blobstore)
(.shutdown blobstore)
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
@@ -1143,8 +1143,8 @@
blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
(try
(FileUtils/forceMkdir (File. tmproot))
- (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
- (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+ (.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+ (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
(finally
(.shutdown blob-store)))
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
http://git-wip-us.apache.org/repos/asf/storm/blob/3d2796d2/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 4b8df65..3dd6d4e 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -232,6 +232,7 @@ public class ConfigUtils {
return stormRoot + FILE_SEPARATOR + "tmp";
}
+ /* Never get used TODO : delete it*/
public static String masterTmpDir(Map conf) throws IOException {
String ret = stormTmpPath(masterLocalDir(conf));
FileUtils.forceMkdir(new File(ret));
@@ -244,6 +245,7 @@ public class ConfigUtils {
return ret;
}
+ /* Never get used TODO : may delete it*/
public static String masterStormMetaFilePath(String stormRoot) {
return (stormRoot + FILE_SEPARATOR + "storm-code-distributor.meta");
}
@@ -252,10 +254,12 @@ public class ConfigUtils {
return (stormRoot + FILE_SEPARATOR + "stormjar.jar");
}
+ /* Never get used TODO : may delete it*/
public static String masterStormCodePath(String stormRoot) {
return (stormRoot + FILE_SEPARATOR + "stormcode.ser");
}
+ /* Never get used TODO : may delete it*/
public static String masterStormConfPath(String stormRoot) {
return (stormRoot + FILE_SEPARATOR + "stormconf.ser");
}