You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/23 17:39:12 UTC

[1/6] storm git commit: STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Repository: storm
Updated Branches:
  refs/heads/master 4b6e43b81 -> ffb1816a9


STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b061aaa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b061aaa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b061aaa

Branch: refs/heads/master
Commit: 6b061aaab5a39e4b95670e3f0590bb48de4375fd
Parents: a115c9d
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jan 29 10:22:20 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jan 29 10:22:20 2015 -0800

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 58 +++++++++++++-------
 1 file changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6b061aaa/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index a5d5aef..bb1c959 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -54,10 +54,10 @@
                       {sid nil})))
            (apply merge)
            (filter-val not-nil?))]
-          
+
       {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
        :versions new-assignments})))
-  
+
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
   (let [assignment (get assignments-snapshot storm-id)
         my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
@@ -163,7 +163,7 @@
      )))
 
 (defn- wait-for-worker-launch [conf id start-time]
-  (let [state (worker-state conf id)]    
+  (let [state (worker-state conf id)]
     (loop []
       (let [hb (.get state LS-WORKER-HEARTBEAT)]
         (when (and
@@ -305,6 +305,7 @@
 (defn sync-processes [supervisor]
   (let [conf (:conf supervisor)
         ^LocalState local-state (:local-state supervisor)
+        storm-cluster-state (:storm-cluster-state supervisor)
         assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
         now (current-time-secs)
         allocated (read-allocated-workers supervisor assigned-executors now)
@@ -327,7 +328,7 @@
     ;; 5. create local dir for worker id
     ;; 5. launch new workers (give worker-id, port, and supervisor-id)
     ;; 6. wait for workers launch
-  
+
     (log-debug "Syncing processes")
     (log-debug "Assigned executors: " assigned-executors)
     (log-debug "Allocated: " allocated)
@@ -349,6 +350,20 @@
                         (keys keepers))
            (zipmap (vals new-worker-ids) (keys new-worker-ids))
            ))
+
+    ;; check storm topology code dir exists before launching workers
+    (doseq [[port assignment] reassign-executors]
+      (let [storm-cluster-state (:storm-cluster-state supervisor)
+            downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+            storm-id (:storm-id assignment)
+            assignment-info (.assignment-info-with-version storm-cluster-state storm-id nil)
+            storm-code-map (read-storm-code-locations assignment-info)
+            master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
+            stormroot (supervisor-stormdist-root conf storm-id)]
+        (if-not (or (downloaded-storm-ids storm-id) (.exists (File. stormroot)))
+          (download-storm-code conf storm-id master-code-dir))
+        ))
+
     (wait-for-workers-launch
      conf
      (dofor [[port assignment] reassign-executors]
@@ -399,7 +414,7 @@
           sync-callback (fn [& ignored] (.add event-manager this))
           assignment-versions @(:assignment-versions supervisor)
           {assignments-snapshot :assignments versions :versions}  (assignments-snapshot 
-                                                                   storm-cluster-state sync-callback 
+                                                                   storm-cluster-state sync-callback
                                                                    assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
@@ -417,7 +432,7 @@
       (log-debug "Downloaded storm ids: " downloaded-storm-ids)
       (log-debug "All assignment: " all-assignment)
       (log-debug "New assignment: " new-assignment)
-      
+
       ;; download code first
       ;; This might take awhile
       ;;   - should this be done separately from usual monitoring?
@@ -425,16 +440,7 @@
       (doseq [[storm-id master-code-dir] storm-code-map]
         (when (and (not (downloaded-storm-ids storm-id))
                    (assigned-storm-ids storm-id))
-          (log-message "Downloading code for storm id "
-             storm-id
-             " from "
-             master-code-dir)
-          (download-storm-code conf storm-id master-code-dir)
-          (log-message "Finished downloading code for storm id "
-             storm-id
-             " from "
-             master-code-dir)
-          ))
+          (download-storm-code conf storm-id master-code-dir)))
 
       (log-debug "Writing new assignment "
                  (pr-str new-assignment))
@@ -470,7 +476,7 @@
   (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
   (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
   (let [supervisor (supervisor-data conf shared-context isupervisor)
-        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]                         
+        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
         sync-processes (partial sync-processes supervisor)
         synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
         heartbeat-fn (fn [] (.supervisor-heartbeat!
@@ -542,14 +548,24 @@
     ;; Downloading to permanent location is atomic
     (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
           stormroot (supervisor-stormdist-root conf storm-id)]
+      (log-message "Downloading code for storm id "
+                   storm-id
+                   " from "
+                   master-code-dir)
       (FileUtils/forceMkdir (File. tmproot))
-      
+
       (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
       (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
       (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
       (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
-      (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
-      (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)      
+      (if-not (.exists (File. stormroot))
+        (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+        (FileUtils/deleteDirectory (File. tmproot)))
+      (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+      (log-message "Finished downloading code for storm id "
+                   storm-id
+                   " from "
+                   master-code-dir)
     ))
 
 (defn write-log-metadata-to-yaml-file! [storm-id port data conf]
@@ -595,7 +611,7 @@
                          "%WORKER-PORT%" (str port)}
         sub-fn #(reduce (fn [string entry]
                           (apply clojure.string/replace string entry))
-                        % 
+                        %
                         replacement-map)]
     (cond
       (nil? value) nil


[3/6] storm git commit: STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Posted by bo...@apache.org.
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d5ac19a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d5ac19a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d5ac19a

Branch: refs/heads/master
Commit: 8d5ac19aceac9cf1bbee0a2cef20ee67d570085b
Parents: eaedc83
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jan 29 22:33:01 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jan 29 22:33:01 2015 -0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8d5ac19a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index fde53c4..e794710 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -358,12 +358,13 @@
       (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
             storm-id (:storm-id assignment)
             cached-assignment-info @(:assignment-versions supervisor)
-            assignment-info (if (nil? cached-assignment-info) (.assignment-info-with-version storm-cluster-state storm-id nil)
-                                (get cached-assignment-info storm-id))
-            storm-code-map (read-storm-code-locations assignment-info)
+            assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
+                              (get cached-assignment-info storm-id)
+                              (.assignment-info-with-version storm-cluster-state storm-id nil)) 
+	    storm-code-map (read-storm-code-locations assignment-info)
             master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
             stormroot (supervisor-stormdist-root conf storm-id)]
-        (if-not (or (downloaded-storm-ids storm-id) (.exists (File. stormroot)))
+        (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
           (download-storm-code conf storm-id master-code-dir download-lock))
         ))
 


[4/6] storm git commit: STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Posted by bo...@apache.org.
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2712ee9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2712ee9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2712ee9b

Branch: refs/heads/master
Commit: 2712ee9b8a4912f74acf33fe1cef7195890ab4fc
Parents: 8d5ac19
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Feb 5 11:02:11 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Feb 5 11:02:11 2015 -0800

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 43 ++++++++++++--------
 1 file changed, 25 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2712ee9b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index e794710..526b057 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -43,7 +43,7 @@
 
 (defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
   (let [storm-ids (.assignments storm-cluster-state callback)]
-    (let [new-assignments 
+    (let [new-assignments
           (->>
            (dofor [sid storm-ids]
                   (let [recorded-version (:version (get assignment-versions sid))]
@@ -263,7 +263,7 @@
         (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
         (force-kill-process pid))
       (if as-user
-        (rmr-as-user conf id user (worker-pid-path conf id pid)) 
+        (rmr-as-user conf id user (worker-pid-path conf id pid))
         (try
           (rmpath (worker-pid-path conf id pid))
           (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
@@ -271,7 +271,7 @@
   (log-message "Shut down " (:supervisor-id supervisor) ":" id))
 
 (def SUPERVISOR-ZK-ACLS
-  [(first ZooDefs$Ids/CREATOR_ALL_ACL) 
+  [(first ZooDefs$Ids/CREATOR_ALL_ACL)
    (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
 
 (defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
@@ -360,7 +360,7 @@
             cached-assignment-info @(:assignment-versions supervisor)
             assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
                               (get cached-assignment-info storm-id)
-                              (.assignment-info-with-version storm-cluster-state storm-id nil)) 
+                              (.assignment-info-with-version storm-cluster-state storm-id nil))
 	    storm-code-map (read-storm-code-locations assignment-info)
             master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
             stormroot (supervisor-stormdist-root conf storm-id)]
@@ -371,20 +371,27 @@
     (wait-for-workers-launch
      conf
      (dofor [[port assignment] reassign-executors]
-       (let [id (new-worker-ids port)]
-         (log-message "Launching worker with assignment "
-                      (pr-str assignment)
-                      " for this supervisor "
-                      (:supervisor-id supervisor)
-                      " on port "
-                      port
-                      " with id "
-                      id
-                      )
-         (launch-worker supervisor
-                        (:storm-id assignment)
-                        port
-                        id)
+            (let [id (new-worker-ids port)]
+              (try
+                (log-message "Launching worker with assignment "
+                             (pr-str assignment)
+                             " for this supervisor "
+                             (:supervisor-id supervisor)
+                             " on port "
+                             port
+                             " with id "
+                             id
+                             )
+                (launch-worker supervisor
+                               (:storm-id assignment)
+                               port
+                               id)
+                (catch java.io.FileNotFoundException e
+                  (log-message "Unable to launch worker due to "
+                               (.getMessage e)))
+                (catch java.io.IOException e
+                  (log-message "Unable to launch worker due to "
+                               (.getMessage e))))
          id)))
     ))
 


[6/6] storm git commit: Added STORM-130 to Changelog

Posted by bo...@apache.org.
Added STORM-130 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ffb1816a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ffb1816a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ffb1816a

Branch: refs/heads/master
Commit: ffb1816a9eadbaea95746ec96d5771448ba6fd6d
Parents: 51e91f0
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 23 10:38:32 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 23 10:38:32 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1816a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 63f1594..91849b1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -53,6 +53,7 @@
  * STORM-673: Typo 'deamon' in security documentation
  * STORM-441: Remove bootstrap macro from Clojure codebase
  * STORM-609: Add storm-redis to storm external
+ * STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
 
 ## 0.9.3-rc2
  * STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor


[2/6] storm git commit: STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Posted by bo...@apache.org.
STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eaedc83c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eaedc83c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eaedc83c

Branch: refs/heads/master
Commit: eaedc83ccfa9cfb150d7ef3e3ba2c4cc5445cb60
Parents: 6b061aa
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Thu Jan 29 16:14:18 2015 -0800
Committer: Sriharsha Chintalapani <ma...@harsha.io>
Committed: Thu Jan 29 16:14:18 2015 -0800

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 111 ++++++++++---------
 1 file changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eaedc83c/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index bb1c959..fde53c4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -45,7 +45,7 @@
   (let [storm-ids (.assignments storm-cluster-state callback)]
     (let [new-assignments 
           (->>
-           (dofor [sid storm-ids] 
+           (dofor [sid storm-ids]
                   (let [recorded-version (:version (get assignment-versions sid))]
                     (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
                       (if (= assignment-version recorded-version)
@@ -149,7 +149,7 @@
                          (or (not (contains? approved-ids id))
                              (not (matches-an-assignment? hb assigned-executors)))
                            :disallowed
-                         (or 
+                         (or
                           (when (get (get-dead-workers) id)
                             (log-message "Worker Process " id " has died!")
                             true)
@@ -300,10 +300,12 @@
                                          ))
    :assignment-versions (atom {})
    :sync-retry (atom 0)
+   :download-lock (Object.)
    })
 
 (defn sync-processes [supervisor]
   (let [conf (:conf supervisor)
+        download-lock (:download-lock supervisor)
         ^LocalState local-state (:local-state supervisor)
         storm-cluster-state (:storm-cluster-state supervisor)
         assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
@@ -353,15 +355,16 @@
 
     ;; check storm topology code dir exists before launching workers
     (doseq [[port assignment] reassign-executors]
-      (let [storm-cluster-state (:storm-cluster-state supervisor)
-            downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+      (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
             storm-id (:storm-id assignment)
-            assignment-info (.assignment-info-with-version storm-cluster-state storm-id nil)
+            cached-assignment-info @(:assignment-versions supervisor)
+            assignment-info (if (nil? cached-assignment-info) (.assignment-info-with-version storm-cluster-state storm-id nil)
+                                (get cached-assignment-info storm-id))
             storm-code-map (read-storm-code-locations assignment-info)
             master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
             stormroot (supervisor-stormdist-root conf storm-id)]
         (if-not (or (downloaded-storm-ids storm-id) (.exists (File. stormroot)))
-          (download-storm-code conf storm-id master-code-dir))
+          (download-storm-code conf storm-id master-code-dir download-lock))
         ))
 
     (wait-for-workers-launch
@@ -408,14 +411,15 @@
 (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
   (fn this []
     (let [conf (:conf supervisor)
+          download-lock (:download-lock supervisor)
           storm-cluster-state (:storm-cluster-state supervisor)
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
           sync-callback (fn [& ignored] (.add event-manager this))
           assignment-versions @(:assignment-versions supervisor)
-          {assignments-snapshot :assignments versions :versions}  (assignments-snapshot 
+          {assignments-snapshot :assignments versions :versions}  (assignments-snapshot
                                                                    storm-cluster-state sync-callback
-                                                                   assignment-versions)
+                                                                  assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
           existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
@@ -440,7 +444,7 @@
       (doseq [[storm-id master-code-dir] storm-code-map]
         (when (and (not (downloaded-storm-ids storm-id))
                    (assigned-storm-ids storm-id))
-          (download-storm-code conf storm-id master-code-dir)))
+          (download-storm-code conf storm-id master-code-dir download-lock)))
 
       (log-debug "Writing new assignment "
                  (pr-str new-assignment))
@@ -544,29 +548,30 @@
 
 ;; distributed implementation
 (defmethod download-storm-code
-    :distributed [conf storm-id master-code-dir]
+    :distributed [conf storm-id master-code-dir download-lock]
     ;; Downloading to permanent location is atomic
     (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
           stormroot (supervisor-stormdist-root conf storm-id)]
-      (log-message "Downloading code for storm id "
-                   storm-id
-                   " from "
-                   master-code-dir)
-      (FileUtils/forceMkdir (File. tmproot))
-
-      (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
-      (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
-      (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
-      (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
-      (if-not (.exists (File. stormroot))
-        (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
-        (FileUtils/deleteDirectory (File. tmproot)))
-      (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
-      (log-message "Finished downloading code for storm id "
-                   storm-id
-                   " from "
-                   master-code-dir)
-    ))
+      (locking download-lock
+            (log-message "Downloading code for storm id "
+                         storm-id
+                         " from "
+                         master-code-dir)
+            (FileUtils/forceMkdir (File. tmproot))
+
+            (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
+            (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
+            (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
+            (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+            (if-not (.exists (File. stormroot))
+              (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+              (FileUtils/deleteDirectory (File. tmproot)))
+            (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+            (log-message "Finished downloading code for storm id "
+                         storm-id
+                         " from "
+                         master-code-dir))
+      ))
 
 (defn write-log-metadata-to-yaml-file! [storm-id port data conf]
   (let [file (get-log-metadata-file storm-id port)]
@@ -600,9 +605,9 @@
         os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
         arch (System/getProperty "os.arch")
         arch-resource-root (str resource-root File/separator os "-" arch)]
-    (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH)))) 
+    (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
 
-(defn substitute-childopts 
+(defn substitute-childopts
   "Generates runtime childopts by replacing keys with topology-id, worker-id, port"
   [value worker-id topology-id port]
   (let [replacement-map {"%ID%"          (str port)
@@ -684,10 +689,10 @@
       (write-log-metadata! storm-conf user worker-id storm-id port conf)
       (set-worker-user! conf worker-id user)
       (let [log-prefix (str "Worker Process " worker-id)
-           callback (fn [exit-code] 
+           callback (fn [exit-code]
                           (log-message log-prefix " exited with code: " exit-code)
                           (add-dead-worker worker-id))]
-        (remove-dead-worker worker-id) 
+        (remove-dead-worker worker-id)
         (if run-worker-as-user
           (let [worker-dir (worker-root conf worker-id)]
             (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback))
@@ -703,25 +708,27 @@
        first ))
 
 (defmethod download-storm-code
-    :local [conf storm-id master-code-dir]
-  (let [stormroot (supervisor-stormdist-root conf storm-id)]
-      (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
-      (let [classloader (.getContextClassLoader (Thread/currentThread))
-            resources-jar (resources-jar)
-            url (.getResource classloader RESOURCES-SUBDIR)
-            target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
-            (cond
-              resources-jar
-              (do
-                (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
-                (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
-              url
-              (do
-                (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
-                (if (= (.getProtocol url) "jar" )
-                    (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
-                    (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
-                )
+    :local [conf storm-id master-code-dir download-lock]
+    (let [stormroot (supervisor-stormdist-root conf storm-id)]
+      (locking download-lock
+            (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
+            (let [classloader (.getContextClassLoader (Thread/currentThread))
+                  resources-jar (resources-jar)
+                  url (.getResource classloader RESOURCES-SUBDIR)
+                  target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+              (cond
+               resources-jar
+               (do
+                 (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+                 (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+               url
+               (do
+                 (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
+                 (if (= (.getProtocol url) "jar" )
+                   (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
+                   (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
+                 )
+               )
               )
             )))
 


[5/6] storm git commit: Merge branch 'STORM-130-V2' of https://github.com/harshach/incubator-storm into STORM-130

Posted by bo...@apache.org.
Merge branch 'STORM-130-V2' of https://github.com/harshach/incubator-storm into STORM-130

STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51e91f0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51e91f0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51e91f0a

Branch: refs/heads/master
Commit: 51e91f0ac2bbcdc7b7239f53082fbeeb32db721c
Parents: 4b6e43b 2712ee9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 23 10:31:58 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 23 10:31:58 2015 -0600

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 175 +++++++++++--------
 1 file changed, 103 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/51e91f0a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------