You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/02/24 21:46:34 UTC

[1/2] storm git commit: STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Repository: storm
Updated Branches:
  refs/heads/0.9.3-branch 1570fa7a9 -> a1e5893e1


STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98f4e619
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98f4e619
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98f4e619

Branch: refs/heads/0.9.3-branch
Commit: 98f4e619d54052b73a309d23ab7214953e4c7774
Parents: 1570fa7
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 5 10:08:05 2015 -0800
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 15:17:33 2015 -0500

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 158 +++++++++++--------
 1 file changed, 92 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/98f4e619/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 820379e..582c3df 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -39,9 +39,9 @@
 
 (defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
   (let [storm-ids (.assignments storm-cluster-state callback)]
-    (let [new-assignments 
+    (let [new-assignments
           (->>
-           (dofor [sid storm-ids] 
+           (dofor [sid storm-ids]
                   (let [recorded-version (:version (get assignment-versions sid))]
                     (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
                       (if (= assignment-version recorded-version)
@@ -50,10 +50,10 @@
                       {sid nil})))
            (apply merge)
            (filter-val not-nil?))]
-          
+
       {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
        :versions new-assignments})))
-  
+
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
   (let [assignment (get assignments-snapshot storm-id)
         my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
@@ -144,7 +144,7 @@
      )))
 
 (defn- wait-for-worker-launch [conf id start-time]
-  (let [state (worker-state conf id)]    
+  (let [state (worker-state conf id)]
     (loop []
       (let [hb (.get state LS-WORKER-HEARTBEAT)]
         (when (and
@@ -220,11 +220,14 @@
                                ))
    :assignment-versions (atom {})
    :sync-retry (atom 0)
+   :download-lock (Object.)
    })
 
 (defn sync-processes [supervisor]
   (let [conf (:conf supervisor)
+        download-lock (:download-lock supervisor)
         ^LocalState local-state (:local-state supervisor)
+        storm-cluster-state (:storm-cluster-state supervisor)
         assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
         now (current-time-secs)
         allocated (read-allocated-workers supervisor assigned-executors now)
@@ -247,7 +250,7 @@
     ;; 5. create local dir for worker id
     ;; 5. launch new workers (give worker-id, port, and supervisor-id)
     ;; 6. wait for workers launch
-  
+
     (log-debug "Syncing processes")
     (log-debug "Assigned executors: " assigned-executors)
     (log-debug "Allocated: " allocated)
@@ -268,25 +271,45 @@
                         (keys keepers))
            (zipmap (vals new-worker-ids) (keys new-worker-ids))
            ))
+    ;; check storm topology code dir exists before launching workers
+    (doseq [[port assignment] reassign-executors]
+      (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+            storm-id (:storm-id assignment)
+            cached-assignment-info @ (:assignment-versions supervisor)
+            assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id))
+                              (get cached-assignment-info storm-id)
+                              (.assignment-info-with-version storm-cluster-state storm-id nil))
+            storm-code-map (read-storm-code-locations assignment-info)
+            master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
+            stormroot (supervisor-stormdist-root conf storm-id)]
+        (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
+          (download-storm-code conf storm-id master-code-dir download-lock))))
+
     (wait-for-workers-launch
      conf
      (dofor [[port assignment] reassign-executors]
-       (let [id (new-worker-ids port)]
-         (log-message "Launching worker with assignment "
-                      (pr-str assignment)
-                      " for this supervisor "
-                      (:supervisor-id supervisor)
-                      " on port "
-                      port
-                      " with id "
-                      id
-                      )
-         (launch-worker supervisor
-                        (:storm-id assignment)
-                        port
-                        id)
-         id)))
-    ))
+            (let [id (new-worker-ids port)]
+              (try
+                (log-message "Launching worker with assignment "
+                             (pr-str assignment)
+                             " for this supervisor "
+                             (:supervisor-id supervisor)
+                             " on port "
+                             port
+                             " with id "
+                             id)
+                (launch-worker supervisor
+                               (:storm-id assignment)
+                               port
+                               id)
+                (catch java.io.FileNotFoundException e
+                  (log-message "Unable to launch worker due to "
+                               (.getMessage e)))
+                (catch java.io.IOException e
+                  (log-message "Unable to launch worker due to "
+                               (.getMessage e))))
+              id))
+     )))
 
 (defn assigned-storm-ids-from-port-assignments [assignment]
   (->> assignment
@@ -312,13 +335,14 @@
 (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
   (fn this []
     (let [conf (:conf supervisor)
+          download-lock (:download-lock supervisor)
           storm-cluster-state (:storm-cluster-state supervisor)
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
           sync-callback (fn [& ignored] (.add event-manager this))
           assignment-versions @(:assignment-versions supervisor)
-          {assignments-snapshot :assignments versions :versions}  (assignments-snapshot 
-                                                                   storm-cluster-state sync-callback 
+          {assignments-snapshot :assignments versions :versions}  (assignments-snapshot
+                                                                   storm-cluster-state sync-callback
                                                                    assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
@@ -336,7 +360,7 @@
       (log-debug "Downloaded storm ids: " downloaded-storm-ids)
       (log-debug "All assignment: " all-assignment)
       (log-debug "New assignment: " new-assignment)
-      
+
       ;; download code first
       ;; This might take awhile
       ;;   - should this be done separately from usual monitoring?
@@ -344,16 +368,7 @@
       (doseq [[storm-id master-code-dir] storm-code-map]
         (when (and (not (downloaded-storm-ids storm-id))
                    (assigned-storm-ids storm-id))
-          (log-message "Downloading code for storm id "
-             storm-id
-             " from "
-             master-code-dir)
-          (download-storm-code conf storm-id master-code-dir)
-          (log-message "Finished downloading code for storm id "
-             storm-id
-             " from "
-             master-code-dir)
-          ))
+          (download-storm-code conf storm-id master-code-dir download-lock)))
 
       (log-debug "Writing new assignment "
                  (pr-str new-assignment))
@@ -389,7 +404,7 @@
   (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
   (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
   (let [supervisor (supervisor-data conf shared-context isupervisor)
-        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]                         
+        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
         sync-processes (partial sync-processes supervisor)
         synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
         heartbeat-fn (fn [] (.supervisor-heartbeat!
@@ -452,27 +467,37 @@
 ;; distributed implementation
 
 (defmethod download-storm-code
-    :distributed [conf storm-id master-code-dir]
-    ;; Downloading to permanent location is atomic
-    (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
-          stormroot (supervisor-stormdist-root conf storm-id)]
+  :distributed [conf storm-id master-code-dir download-lock]
+  ;; Downloading to permanent location is atomic
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+        stormroot (supervisor-stormdist-root conf storm-id)]
+    (locking download-lock
+      (log-message "Downloading code for storm id "
+                   storm-id
+                   " from "
+                   master-code-dir)
+
       (FileUtils/forceMkdir (File. tmproot))
-      
+
       (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
       (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
       (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
       (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
       (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
-      ))
+      (log-message "Finished downloading code for storm id "
+                   storm-id
+                   " from "
+                   master-code-dir))
+    ))
 
 (defn jlp [stormroot conf]
   (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
         os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
         arch (System/getProperty "os.arch")
         arch-resource-root (str resource-root File/separator os "-" arch)]
-    (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH)))) 
+    (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
 
-(defn substitute-childopts 
+(defn substitute-childopts
   "Generates runtime childopts by replacing keys with topology-id, worker-id, port"
   [value worker-id topology-id port]
   (let [replacement-map {"%ID%"          (str port)
@@ -481,7 +506,7 @@
                          "%WORKER-PORT%" (str port)}
         sub-fn #(reduce (fn [string entry]
                           (apply clojure.string/replace string entry))
-                        % 
+                        %
                         replacement-map)]
     (cond
       (nil? value) nil
@@ -558,27 +583,28 @@
        first ))
 
 (defmethod download-storm-code
-    :local [conf storm-id master-code-dir]
-  (let [stormroot (supervisor-stormdist-root conf storm-id)]
-      (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
-      (let [classloader (.getContextClassLoader (Thread/currentThread))
-            resources-jar (resources-jar)
-            url (.getResource classloader RESOURCES-SUBDIR)
-            target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
-            (cond
-              resources-jar
-              (do
-                (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
-                (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
-              url
-              (do
-                (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
-                (if (= (.getProtocol url) "jar" )
-                    (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
-                    (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
-                )
-              )
-            )))
+    :local [conf storm-id master-code-dir download-lock]
+    (let [stormroot (supervisor-stormdist-root conf storm-id)]
+      (locking download-lock
+        (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
+        (let [classloader (.getContextClassLoader (Thread/currentThread))
+              resources-jar (resources-jar)
+              url (.getResource classloader RESOURCES-SUBDIR)
+              target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+          (cond
+           resources-jar
+           (do
+             (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+             (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+           url
+           (do
+             (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
+             (if (= (.getProtocol url) "jar" )
+               (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
+               (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
+             )
+           )
+          ))))
 
 (defmethod launch-worker
     :local [supervisor storm-id port worker-id]


[2/2] storm git commit: update changelog for STORM-130

Posted by pt...@apache.org.
update changelog for STORM-130


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1e5893e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1e5893e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1e5893e

Branch: refs/heads/0.9.3-branch
Commit: a1e5893e1b94c224d39fedf11583b216c21351c8
Parents: 98f4e61
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Feb 24 15:46:12 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 15:46:12 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1e5893e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b7a7f41..5839b7a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+## 0.9.4
+ * STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
+
 ## 0.9.3-rc2
  * STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor
  * STORM-555: Storm json response should set charset to UTF-8