You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/23 17:39:12 UTC
[1/6] storm git commit: STORM-130: Supervisor getting killed due to
java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Repository: storm
Updated Branches:
refs/heads/master 4b6e43b81 -> ffb1816a9
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b061aaa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b061aaa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b061aaa
Branch: refs/heads/master
Commit: 6b061aaab5a39e4b95670e3f0590bb48de4375fd
Parents: a115c9d
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jan 29 10:22:20 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jan 29 10:22:20 2015 -0800
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 58 +++++++++++++-------
1 file changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6b061aaa/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index a5d5aef..bb1c959 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -54,10 +54,10 @@
{sid nil})))
(apply merge)
(filter-val not-nil?))]
-
+
{:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
:versions new-assignments})))
-
+
(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (get assignments-snapshot storm-id)
my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
@@ -163,7 +163,7 @@
)))
(defn- wait-for-worker-launch [conf id start-time]
- (let [state (worker-state conf id)]
+ (let [state (worker-state conf id)]
(loop []
(let [hb (.get state LS-WORKER-HEARTBEAT)]
(when (and
@@ -305,6 +305,7 @@
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
+ storm-cluster-state (:storm-cluster-state supervisor)
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
@@ -327,7 +328,7 @@
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch
-
+
(log-debug "Syncing processes")
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Allocated: " allocated)
@@ -349,6 +350,20 @@
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
))
+
+ ;; check storm topology code dir exists before launching workers
+ (doseq [[port assignment] reassign-executors]
+ (let [storm-cluster-state (:storm-cluster-state supervisor)
+ downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ storm-id (:storm-id assignment)
+ assignment-info (.assignment-info-with-version storm-cluster-state storm-id nil)
+ storm-code-map (read-storm-code-locations assignment-info)
+ master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
+ stormroot (supervisor-stormdist-root conf storm-id)]
+ (if-not (or (downloaded-storm-ids storm-id) (.exists (File. stormroot)))
+ (download-storm-code conf storm-id master-code-dir))
+ ))
+
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
@@ -399,7 +414,7 @@
sync-callback (fn [& ignored] (.add event-manager this))
assignment-versions @(:assignment-versions supervisor)
{assignments-snapshot :assignments versions :versions} (assignments-snapshot
- storm-cluster-state sync-callback
+ storm-cluster-state sync-callback
assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
@@ -417,7 +432,7 @@
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment)
-
+
;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
@@ -425,16 +440,7 @@
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
- (log-message "Downloading code for storm id "
- storm-id
- " from "
- master-code-dir)
- (download-storm-code conf storm-id master-code-dir)
- (log-message "Finished downloading code for storm id "
- storm-id
- " from "
- master-code-dir)
- ))
+ (download-storm-code conf storm-id master-code-dir)))
(log-debug "Writing new assignment "
(pr-str new-assignment))
@@ -470,7 +476,7 @@
(.prepare isupervisor conf (supervisor-isupervisor-dir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
- [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
+ [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
heartbeat-fn (fn [] (.supervisor-heartbeat!
@@ -542,14 +548,24 @@
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
+ (log-message "Downloading code for storm id "
+ storm-id
+ " from "
+ master-code-dir)
(FileUtils/forceMkdir (File. tmproot))
-
+
(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
- (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+ (if-not (.exists (File. stormroot))
+ (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+ (FileUtils/deleteDirectory (File. tmproot)))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+ (log-message "Finished downloading code for storm id "
+ storm-id
+ " from "
+ master-code-dir)
))
(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
@@ -595,7 +611,7 @@
"%WORKER-PORT%" (str port)}
sub-fn #(reduce (fn [string entry]
(apply clojure.string/replace string entry))
- %
+ %
replacement-map)]
(cond
(nil? value) nil
[3/6] storm git commit: STORM-130: Supervisor getting killed due to
java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Posted by bo...@apache.org.
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d5ac19a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d5ac19a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d5ac19a
Branch: refs/heads/master
Commit: 8d5ac19aceac9cf1bbee0a2cef20ee67d570085b
Parents: eaedc83
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jan 29 22:33:01 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jan 29 22:33:01 2015 -0800
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8d5ac19a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index fde53c4..e794710 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -358,12 +358,13 @@
(let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
storm-id (:storm-id assignment)
cached-assignment-info @(:assignment-versions supervisor)
- assignment-info (if (nil? cached-assignment-info) (.assignment-info-with-version storm-cluster-state storm-id nil)
- (get cached-assignment-info storm-id))
- storm-code-map (read-storm-code-locations assignment-info)
+ assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
+ (get cached-assignment-info storm-id)
+ (.assignment-info-with-version storm-cluster-state storm-id nil))
+ storm-code-map (read-storm-code-locations assignment-info)
master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
stormroot (supervisor-stormdist-root conf storm-id)]
- (if-not (or (downloaded-storm-ids storm-id) (.exists (File. stormroot)))
+ (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
(download-storm-code conf storm-id master-code-dir download-lock))
))
[4/6] storm git commit: STORM-130: Supervisor getting killed due to
java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Posted by bo...@apache.org.
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2712ee9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2712ee9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2712ee9b
Branch: refs/heads/master
Commit: 2712ee9b8a4912f74acf33fe1cef7195890ab4fc
Parents: 8d5ac19
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 5 11:02:11 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 5 11:02:11 2015 -0800
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 43 ++++++++++++--------
1 file changed, 25 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2712ee9b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index e794710..526b057 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -43,7 +43,7 @@
(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
(let [storm-ids (.assignments storm-cluster-state callback)]
- (let [new-assignments
+ (let [new-assignments
(->>
(dofor [sid storm-ids]
(let [recorded-version (:version (get assignment-versions sid))]
@@ -263,7 +263,7 @@
(worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
(force-kill-process pid))
(if as-user
- (rmr-as-user conf id user (worker-pid-path conf id pid))
+ (rmr-as-user conf id user (worker-pid-path conf id pid))
(try
(rmpath (worker-pid-path conf id pid))
(catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
@@ -271,7 +271,7 @@
(log-message "Shut down " (:supervisor-id supervisor) ":" id))
(def SUPERVISOR-ZK-ACLS
- [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+ [(first ZooDefs$Ids/CREATOR_ALL_ACL)
(ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
@@ -360,7 +360,7 @@
cached-assignment-info @(:assignment-versions supervisor)
assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
(get cached-assignment-info storm-id)
- (.assignment-info-with-version storm-cluster-state storm-id nil))
+ (.assignment-info-with-version storm-cluster-state storm-id nil))
storm-code-map (read-storm-code-locations assignment-info)
master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
stormroot (supervisor-stormdist-root conf storm-id)]
@@ -371,20 +371,27 @@
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
- (let [id (new-worker-ids port)]
- (log-message "Launching worker with assignment "
- (pr-str assignment)
- " for this supervisor "
- (:supervisor-id supervisor)
- " on port "
- port
- " with id "
- id
- )
- (launch-worker supervisor
- (:storm-id assignment)
- port
- id)
+ (let [id (new-worker-ids port)]
+ (try
+ (log-message "Launching worker with assignment "
+ (pr-str assignment)
+ " for this supervisor "
+ (:supervisor-id supervisor)
+ " on port "
+ port
+ " with id "
+ id
+ )
+ (launch-worker supervisor
+ (:storm-id assignment)
+ port
+ id)
+ (catch java.io.FileNotFoundException e
+ (log-message "Unable to launch worker due to "
+ (.getMessage e)))
+ (catch java.io.IOException e
+ (log-message "Unable to launch worker due to "
+ (.getMessage e))))
id)))
))
[6/6] storm git commit: Added STORM-130 to Changelog
Posted by bo...@apache.org.
Added STORM-130 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ffb1816a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ffb1816a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ffb1816a
Branch: refs/heads/master
Commit: ffb1816a9eadbaea95746ec96d5771448ba6fd6d
Parents: 51e91f0
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 23 10:38:32 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 23 10:38:32 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1816a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 63f1594..91849b1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -53,6 +53,7 @@
* STORM-673: Typo 'deamon' in security documentation
* STORM-441: Remove bootstrap macro from Clojure codebase
* STORM-609: Add storm-redis to storm external
+ * STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
## 0.9.3-rc2
* STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor
[2/6] storm git commit: STORM-130: Supervisor getting killed due to
java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Posted by bo...@apache.org.
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eaedc83c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eaedc83c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eaedc83c
Branch: refs/heads/master
Commit: eaedc83ccfa9cfb150d7ef3e3ba2c4cc5445cb60
Parents: 6b061aa
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jan 29 16:14:18 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jan 29 16:14:18 2015 -0800
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 111 ++++++++++---------
1 file changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/eaedc83c/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index bb1c959..fde53c4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -45,7 +45,7 @@
(let [storm-ids (.assignments storm-cluster-state callback)]
(let [new-assignments
(->>
- (dofor [sid storm-ids]
+ (dofor [sid storm-ids]
(let [recorded-version (:version (get assignment-versions sid))]
(if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
(if (= assignment-version recorded-version)
@@ -149,7 +149,7 @@
(or (not (contains? approved-ids id))
(not (matches-an-assignment? hb assigned-executors)))
:disallowed
- (or
+ (or
(when (get (get-dead-workers) id)
(log-message "Worker Process " id " has died!")
true)
@@ -300,10 +300,12 @@
))
:assignment-versions (atom {})
:sync-retry (atom 0)
+ :download-lock (Object.)
})
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
+ download-lock (:download-lock supervisor)
^LocalState local-state (:local-state supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
@@ -353,15 +355,16 @@
;; check storm topology code dir exists before launching workers
(doseq [[port assignment] reassign-executors]
- (let [storm-cluster-state (:storm-cluster-state supervisor)
- downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
storm-id (:storm-id assignment)
- assignment-info (.assignment-info-with-version storm-cluster-state storm-id nil)
+ cached-assignment-info @(:assignment-versions supervisor)
+ assignment-info (if (nil? cached-assignment-info) (.assignment-info-with-version storm-cluster-state storm-id nil)
+ (get cached-assignment-info storm-id))
storm-code-map (read-storm-code-locations assignment-info)
master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
stormroot (supervisor-stormdist-root conf storm-id)]
(if-not (or (downloaded-storm-ids storm-id) (.exists (File. stormroot)))
- (download-storm-code conf storm-id master-code-dir))
+ (download-storm-code conf storm-id master-code-dir download-lock))
))
(wait-for-workers-launch
@@ -408,14 +411,15 @@
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
+ download-lock (:download-lock supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
assignment-versions @(:assignment-versions supervisor)
- {assignments-snapshot :assignments versions :versions} (assignments-snapshot
+ {assignments-snapshot :assignments versions :versions} (assignments-snapshot
storm-cluster-state sync-callback
- assignment-versions)
+ assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
@@ -440,7 +444,7 @@
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
- (download-storm-code conf storm-id master-code-dir)))
+ (download-storm-code conf storm-id master-code-dir download-lock)))
(log-debug "Writing new assignment "
(pr-str new-assignment))
@@ -544,29 +548,30 @@
;; distributed implementation
(defmethod download-storm-code
- :distributed [conf storm-id master-code-dir]
+ :distributed [conf storm-id master-code-dir download-lock]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
- (log-message "Downloading code for storm id "
- storm-id
- " from "
- master-code-dir)
- (FileUtils/forceMkdir (File. tmproot))
-
- (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
- (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
- (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
- (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
- (if-not (.exists (File. stormroot))
- (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- (FileUtils/deleteDirectory (File. tmproot)))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
- (log-message "Finished downloading code for storm id "
- storm-id
- " from "
- master-code-dir)
- ))
+ (locking download-lock
+ (log-message "Downloading code for storm id "
+ storm-id
+ " from "
+ master-code-dir)
+ (FileUtils/forceMkdir (File. tmproot))
+
+ (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
+ (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
+ (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
+ (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+ (if-not (.exists (File. stormroot))
+ (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+ (FileUtils/deleteDirectory (File. tmproot)))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+ (log-message "Finished downloading code for storm id "
+ storm-id
+ " from "
+ master-code-dir))
+ ))
(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
(let [file (get-log-metadata-file storm-id port)]
@@ -600,9 +605,9 @@
os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
arch (System/getProperty "os.arch")
arch-resource-root (str resource-root File/separator os "-" arch)]
- (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
+ (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
-(defn substitute-childopts
+(defn substitute-childopts
"Generates runtime childopts by replacing keys with topology-id, worker-id, port"
[value worker-id topology-id port]
(let [replacement-map {"%ID%" (str port)
@@ -684,10 +689,10 @@
(write-log-metadata! storm-conf user worker-id storm-id port conf)
(set-worker-user! conf worker-id user)
(let [log-prefix (str "Worker Process " worker-id)
- callback (fn [exit-code]
+ callback (fn [exit-code]
(log-message log-prefix " exited with code: " exit-code)
(add-dead-worker worker-id))]
- (remove-dead-worker worker-id)
+ (remove-dead-worker worker-id)
(if run-worker-as-user
(let [worker-dir (worker-root conf worker-id)]
(worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback))
@@ -703,25 +708,27 @@
first ))
(defmethod download-storm-code
- :local [conf storm-id master-code-dir]
- (let [stormroot (supervisor-stormdist-root conf storm-id)]
- (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
- (let [classloader (.getContextClassLoader (Thread/currentThread))
- resources-jar (resources-jar)
- url (.getResource classloader RESOURCES-SUBDIR)
- target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
- (cond
- resources-jar
- (do
- (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
- (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
- url
- (do
- (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
- (if (= (.getProtocol url) "jar" )
- (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
- (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
- )
+ :local [conf storm-id master-code-dir download-lock]
+ (let [stormroot (supervisor-stormdist-root conf storm-id)]
+ (locking download-lock
+ (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
+ (let [classloader (.getContextClassLoader (Thread/currentThread))
+ resources-jar (resources-jar)
+ url (.getResource classloader RESOURCES-SUBDIR)
+ target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+ (cond
+ resources-jar
+ (do
+ (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+ (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+ url
+ (do
+ (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
+ (if (= (.getProtocol url) "jar" )
+ (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
+ (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
+ )
+ )
)
)))
[5/6] storm git commit: Merge branch 'STORM-130-V2' of
https://github.com/harshach/incubator-storm into STORM-130
Posted by bo...@apache.org.
Merge branch 'STORM-130-V2' of https://github.com/harshach/incubator-storm into STORM-130
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51e91f0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51e91f0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51e91f0a
Branch: refs/heads/master
Commit: 51e91f0ac2bbcdc7b7239f53082fbeeb32db721c
Parents: 4b6e43b 2712ee9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 23 10:31:58 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 23 10:31:58 2015 -0600
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 175 +++++++++++--------
1 file changed, 103 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/51e91f0a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------