You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/02/24 21:46:34 UTC
[1/2] storm git commit: STORM-130: Supervisor getting killed due to
java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Repository: storm
Updated Branches:
refs/heads/0.9.3-branch 1570fa7a9 -> a1e5893e1
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98f4e619
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98f4e619
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98f4e619
Branch: refs/heads/0.9.3-branch
Commit: 98f4e619d54052b73a309d23ab7214953e4c7774
Parents: 1570fa7
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 5 10:08:05 2015 -0800
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 15:17:33 2015 -0500
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 158 +++++++++++--------
1 file changed, 92 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/98f4e619/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 820379e..582c3df 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -39,9 +39,9 @@
(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
(let [storm-ids (.assignments storm-cluster-state callback)]
- (let [new-assignments
+ (let [new-assignments
(->>
- (dofor [sid storm-ids]
+ (dofor [sid storm-ids]
(let [recorded-version (:version (get assignment-versions sid))]
(if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
(if (= assignment-version recorded-version)
@@ -50,10 +50,10 @@
{sid nil})))
(apply merge)
(filter-val not-nil?))]
-
+
{:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
:versions new-assignments})))
-
+
(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (get assignments-snapshot storm-id)
my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
@@ -144,7 +144,7 @@
)))
(defn- wait-for-worker-launch [conf id start-time]
- (let [state (worker-state conf id)]
+ (let [state (worker-state conf id)]
(loop []
(let [hb (.get state LS-WORKER-HEARTBEAT)]
(when (and
@@ -220,11 +220,14 @@
))
:assignment-versions (atom {})
:sync-retry (atom 0)
+ :download-lock (Object.)
})
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
+ download-lock (:download-lock supervisor)
^LocalState local-state (:local-state supervisor)
+ storm-cluster-state (:storm-cluster-state supervisor)
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
@@ -247,7 +250,7 @@
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch
-
+
(log-debug "Syncing processes")
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Allocated: " allocated)
@@ -268,25 +271,45 @@
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
))
+ ;; check storm topology code dir exists before launching workers
+ (doseq [[port assignment] reassign-executors]
+ (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ storm-id (:storm-id assignment)
+ cached-assignment-info @ (:assignment-versions supervisor)
+ assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id))
+ (get cached-assignment-info storm-id)
+ (.assignment-info-with-version storm-cluster-state storm-id nil))
+ storm-code-map (read-storm-code-locations assignment-info)
+ master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
+ stormroot (supervisor-stormdist-root conf storm-id)]
+ (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
+ (download-storm-code conf storm-id master-code-dir download-lock))))
+
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
- (let [id (new-worker-ids port)]
- (log-message "Launching worker with assignment "
- (pr-str assignment)
- " for this supervisor "
- (:supervisor-id supervisor)
- " on port "
- port
- " with id "
- id
- )
- (launch-worker supervisor
- (:storm-id assignment)
- port
- id)
- id)))
- ))
+ (let [id (new-worker-ids port)]
+ (try
+ (log-message "Launching worker with assignment "
+ (pr-str assignment)
+ " for this supervisor "
+ (:supervisor-id supervisor)
+ " on port "
+ port
+ " with id "
+ id)
+ (launch-worker supervisor
+ (:storm-id assignment)
+ port
+ id)
+ (catch java.io.FileNotFoundException e
+ (log-message "Unable to launch worker due to "
+ (.getMessage e)))
+ (catch java.io.IOException e
+ (log-message "Unable to launch worker due to "
+ (.getMessage e))))
+ id))
+ )))
(defn assigned-storm-ids-from-port-assignments [assignment]
(->> assignment
@@ -312,13 +335,14 @@
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
+ download-lock (:download-lock supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
assignment-versions @(:assignment-versions supervisor)
- {assignments-snapshot :assignments versions :versions} (assignments-snapshot
- storm-cluster-state sync-callback
+ {assignments-snapshot :assignments versions :versions} (assignments-snapshot
+ storm-cluster-state sync-callback
assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
@@ -336,7 +360,7 @@
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment)
-
+
;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
@@ -344,16 +368,7 @@
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
- (log-message "Downloading code for storm id "
- storm-id
- " from "
- master-code-dir)
- (download-storm-code conf storm-id master-code-dir)
- (log-message "Finished downloading code for storm id "
- storm-id
- " from "
- master-code-dir)
- ))
+ (download-storm-code conf storm-id master-code-dir download-lock)))
(log-debug "Writing new assignment "
(pr-str new-assignment))
@@ -389,7 +404,7 @@
(.prepare isupervisor conf (supervisor-isupervisor-dir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
- [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
+ [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
heartbeat-fn (fn [] (.supervisor-heartbeat!
@@ -452,27 +467,37 @@
;; distributed implementation
(defmethod download-storm-code
- :distributed [conf storm-id master-code-dir]
- ;; Downloading to permanent location is atomic
- (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
- stormroot (supervisor-stormdist-root conf storm-id)]
+ :distributed [conf storm-id master-code-dir download-lock]
+ ;; Downloading to permanent location is atomic
+ (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+ stormroot (supervisor-stormdist-root conf storm-id)]
+ (locking download-lock
+ (log-message "Downloading code for storm id "
+ storm-id
+ " from "
+ master-code-dir)
+
(FileUtils/forceMkdir (File. tmproot))
-
+
(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- ))
+ (log-message "Finished downloading code for storm id "
+ storm-id
+ " from "
+ master-code-dir))
+ ))
(defn jlp [stormroot conf]
(let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
arch (System/getProperty "os.arch")
arch-resource-root (str resource-root File/separator os "-" arch)]
- (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
+ (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
-(defn substitute-childopts
+(defn substitute-childopts
"Generates runtime childopts by replacing keys with topology-id, worker-id, port"
[value worker-id topology-id port]
(let [replacement-map {"%ID%" (str port)
@@ -481,7 +506,7 @@
"%WORKER-PORT%" (str port)}
sub-fn #(reduce (fn [string entry]
(apply clojure.string/replace string entry))
- %
+ %
replacement-map)]
(cond
(nil? value) nil
@@ -558,27 +583,28 @@
first ))
(defmethod download-storm-code
- :local [conf storm-id master-code-dir]
- (let [stormroot (supervisor-stormdist-root conf storm-id)]
- (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
- (let [classloader (.getContextClassLoader (Thread/currentThread))
- resources-jar (resources-jar)
- url (.getResource classloader RESOURCES-SUBDIR)
- target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
- (cond
- resources-jar
- (do
- (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
- (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
- url
- (do
- (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
- (if (= (.getProtocol url) "jar" )
- (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
- (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
- )
- )
- )))
+ :local [conf storm-id master-code-dir download-lock]
+ (let [stormroot (supervisor-stormdist-root conf storm-id)]
+ (locking download-lock
+ (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
+ (let [classloader (.getContextClassLoader (Thread/currentThread))
+ resources-jar (resources-jar)
+ url (.getResource classloader RESOURCES-SUBDIR)
+ target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+ (cond
+ resources-jar
+ (do
+ (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+ (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+ url
+ (do
+ (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
+ (if (= (.getProtocol url) "jar" )
+ (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
+ (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
+ )
+ )
+ ))))
(defmethod launch-worker
:local [supervisor storm-id port worker-id]
[2/2] storm git commit: update changelog for STORM-130
Posted by pt...@apache.org.
update changelog for STORM-130
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1e5893e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1e5893e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1e5893e
Branch: refs/heads/0.9.3-branch
Commit: a1e5893e1b94c224d39fedf11583b216c21351c8
Parents: 98f4e61
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Feb 24 15:46:12 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 15:46:12 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e5893e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b7a7f41..5839b7a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+## 0.9.4
+ * STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
+
## 0.9.3-rc2
* STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor
* STORM-555: Storm json response should set charset to UTF-8