You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:04:05 UTC

[10/17] storm git commit: Blobstore API STORM- 876

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index d659d57..e066269 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -14,19 +14,21 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.daemon.supervisor
-  (:import [java.io OutputStreamWriter BufferedWriter IOException])
+  (:import [java.io File IOException FileOutputStream])
   (:import [backtype.storm.scheduler ISupervisor]
            [backtype.storm.utils LocalState Time Utils]
            [backtype.storm.daemon Shutdownable]
            [backtype.storm Constants]
            [java.net JarURLConnection]
            [java.net URI]
-           [org.apache.commons.io FileUtils]
-           [java.io File])
+           [org.apache.commons.io FileUtils])
   (:use [backtype.storm config util log timer local-state])
+  (:import [backtype.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
   (:import [backtype.storm.utils VersionInfo])
+  (:import [java.nio.file Files StandardCopyOption])
   (:import [backtype.storm Config])
   (:import [backtype.storm.generated WorkerResources ProfileAction])
+  (:import [backtype.storm.localizer LocalResource])
   (:use [backtype.storm.daemon common])
   (:require [backtype.storm.command [healthcheck :as healthcheck]])
   (:require [backtype.storm.daemon [worker :as worker]]
@@ -44,7 +46,6 @@
 
 (defmulti download-storm-code cluster-mode)
 (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
-(defmulti mk-code-distributor cluster-mode)
 
 (defprotocol SupervisorDaemon
   (get-id [this])
@@ -238,20 +239,21 @@
 (defn- rmr-as-user
   "Launches a process owned by the given user that deletes the given path
   recursively.  Throws RuntimeException if the directory is not removed."
-  [conf id user path]
-  (worker-launcher-and-wait conf
-                            user
-                            ["rmr" path]
-                            :log-prefix (str "rmr " id))
-  (if (exists-file? path)
-    (throw (RuntimeException. (str path " was not deleted")))))
-
-(defn try-cleanup-worker [conf id user]
+  [conf id path]
+  (let [user (Utils/getFileOwner path)]
+    (worker-launcher-and-wait conf
+      user
+      ["rmr" path]
+      :log-prefix (str "rmr " id))
+    (if (exists-file? path)
+      (throw (RuntimeException. (str path " was not deleted"))))))
+
+(defn try-cleanup-worker [conf id]
   (try
     (if (.exists (File. (worker-root conf id)))
       (do
         (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
-          (rmr-as-user conf id user (worker-root conf id))
+          (rmr-as-user conf id (worker-root conf id))
           (do
             (rmr (worker-heartbeats-root conf id))
             ;; this avoids a race condition with worker or subprocess writing pid around same time
@@ -290,11 +292,11 @@
         (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
         (force-kill-process pid))
       (if as-user
-        (rmr-as-user conf id user (worker-pid-path conf id pid))
+        (rmr-as-user conf id (worker-pid-path conf id pid))
         (try
           (rmpath (worker-pid-path conf id pid))
           (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
-    (try-cleanup-worker conf id user))
+    (try-cleanup-worker conf id))
   (log-message "Shut down " (:supervisor-id supervisor) ":" id))
 
 (def SUPERVISOR-ZK-ACLS
@@ -326,16 +328,62 @@
                                          (log-error t "Error when processing event")
                                          (exit-process! 20 "Error when processing an event")
                                          ))
+   :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
+                                           [t]
+                                           (log-error t "Error when processing event")
+                                           (exit-process! 20 "Error when processing a event"))
+                                :timer-name "blob-update-timer")
+   :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
    :assignment-versions (atom {})
    :sync-retry (atom 0)
-   :code-distributor (mk-code-distributor conf)
    :download-lock (Object.)
    :stormid->profiler-actions (atom {})
    })
 
+(defn required-topo-files-exist?
+  [conf storm-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+        stormjarpath (supervisor-stormjar-path stormroot)
+        stormcodepath (supervisor-stormcode-path stormroot)
+        stormconfpath (supervisor-stormconf-path stormroot)]
+    (and (every? exists-file? [stormroot stormconfpath stormcodepath])
+         (or (local-mode? conf)
+             (exists-file? stormjarpath)))))
+
+(defn get-worker-assignment-helper-msg
+  [assignment supervisor port id]
+  (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port "
+    port " with id " id))
+
+(defn get-valid-new-worker-ids
+  [conf supervisor reassign-executors new-worker-ids]
+  (into {}
+    (remove nil?
+      (dofor [[port assignment] reassign-executors]
+        (let [id (new-worker-ids port)
+              storm-id (:storm-id assignment)
+              ^WorkerResources resources (:resources assignment)
+              mem-onheap (.get_mem_on_heap resources)]
+          ;; This condition checks for required files exist before launching the worker
+          (if (required-topo-files-exist? conf storm-id)
+            (do
+              (log-message "Launching worker with assignment "
+                (get-worker-assignment-helper-msg assignment supervisor port id))
+              (local-mkdirs (worker-pids-root conf id))
+              (local-mkdirs (worker-heartbeats-root conf id))
+              (launch-worker supervisor
+                (:storm-id assignment)
+                port
+                id
+                mem-onheap)
+              [id port])
+            (do
+              (log-message "Missing topology storm code, so can't launch worker with assignment "
+                (get-worker-assignment-helper-msg assignment supervisor port id))
+              nil)))))))
+
 (defn sync-processes [supervisor]
   (let [conf (:conf supervisor)
-        download-lock (:download-lock supervisor)
         ^LocalState local-state (:local-state supervisor)
         storm-cluster-state (:storm-cluster-state supervisor)
         assigned-executors (defaulted (ls-local-assignments local-state) {})
@@ -349,8 +397,7 @@
         new-worker-ids (into
                         {}
                         (for [port (keys reassign-executors)]
-                          [port (uuid)]))
-        ]
+                          [port (uuid)]))]
     ;; 1. to kill are those in allocated that are dead or disallowed
     ;; 2. kill the ones that should be dead
     ;;     - read pids, kill -9 and individually remove file
@@ -371,67 +418,14 @@
          ". Current supervisor time: " now
          ". State: " state
          ", Heartbeat: " (pr-str heartbeat))
-        (shutdown-worker supervisor id)
-        (if (:code-distributor supervisor)
-          (.cleanup (:code-distributor supervisor) id))
-        ))
-
-    (doseq [id (vals new-worker-ids)]
-      (local-mkdirs (worker-pids-root conf id))
-      (local-mkdirs (worker-heartbeats-root conf id)))
-    (ls-approved-workers! local-state
-          (merge
-           (select-keys (ls-approved-workers local-state)
-                        (keys keepers))
-           (zipmap (vals new-worker-ids) (keys new-worker-ids))
-           ))
-
-    ;; check storm topology code dir exists before launching workers
-    (doseq [[port assignment] reassign-executors]
-      (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
-            storm-id (:storm-id assignment)
-            cached-assignment-info @(:assignment-versions supervisor)
-            assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
-                              (get cached-assignment-info storm-id)
-                              (.assignment-info-with-version storm-cluster-state storm-id nil))
-	    storm-code-map (read-storm-code-locations assignment-info)
-            master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
-            stormroot (supervisor-stormdist-root conf storm-id)]
-        (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
-          (download-storm-code conf storm-id master-code-dir supervisor download-lock))
-        ))
-
-    (wait-for-workers-launch
-     conf
-     (dofor [[port assignment] reassign-executors]
-            (let [id (new-worker-ids port)
-                  storm-id (:storm-id assignment)
-                  ^WorkerResources resources (:resources assignment)
-                  mem-onheap (.get_mem_on_heap resources)]
-              (try
-                (log-message "Launching worker with assignment "
-                             (pr-str assignment)
-                             " for this supervisor "
-                             (:supervisor-id supervisor)
-                             " on port "
-                             port
-                             " with id "
-                             id
-                             )
-                (launch-worker supervisor
-                               (:storm-id assignment)
-                               port
-                               id
-                               mem-onheap)
-                (mark! supervisor:num-workers-launched)
-                (catch java.io.FileNotFoundException e
-                  (log-message "Unable to launch worker due to "
-                               (.getMessage e)))
-                (catch java.io.IOException e
-                  (log-message "Unable to launch worker due to "
-                               (.getMessage e))))
-         id)))
-    ))
+        (shutdown-worker supervisor id)))
+    (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
+      (ls-approved-workers! local-state
+                        (merge
+                          (select-keys (ls-approved-workers local-state)
+                            (keys keepers))
+                          valid-new-worker-ids))
+      (wait-for-workers-launch conf (keys valid-new-worker-ids)))))
 
 (defn assigned-storm-ids-from-port-assignments [assignment]
   (->> assignment
@@ -454,10 +448,80 @@
       (shutdown-worker supervisor id))
     ))
 
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (or (get blob-info "localname") defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, handling it either being
+  a string or a boolean value, or if it's not specified then returns false"
+  [blob-info]
+  (Boolean. (get blob-info "uncompress")))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        topo-name (storm-conf TOPOLOGY-NAME)]
+    (if blobstore-map
+      (doseq [[k, v] blobstore-map]
+        (.removeBlobReference localizer
+          k
+          user
+          topo-name
+          (should-uncompress-blob? v))))))
+
+(defn blobstore-map-to-localresources
+  "Returns a list of LocalResources based on the blobstore-map passed in."
+  [blobstore-map]
+  (if blobstore-map
+    (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v)))
+    ()))
+
+(defn add-blob-references
+  "For each of the downloaded topologies, adds references to the blobs that the topologies are
+  using. This is used to reconstruct the cache on restart."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        topo-name (storm-conf TOPOLOGY-NAME)
+        localresources (blobstore-map-to-localresources blobstore-map)]
+    (if blobstore-map
+      (.addReferences localizer localresources user topo-name))))
+
+(defn rm-topo-files
+  [conf storm-id localizer rm-blob-refs?]
+  (let [path (supervisor-stormdist-root conf storm-id)]
+    (try
+      (if rm-blob-refs?
+        (remove-blob-references localizer storm-id conf))
+      (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+        (rmr-as-user conf storm-id path)
+        (rmr (supervisor-stormdist-root conf storm-id)))
+      (catch Exception e
+        (log-message e (str "Exception removing: " storm-id))))))
+
+(defn verify-downloaded-files
+  "Check for the files exists to avoid supervisor crashing
+   Also makes sure there is no necessity for locking"
+  [conf localizer assigned-storm-ids all-downloaded-storm-ids]
+  (remove nil?
+    (into #{}
+      (for [storm-id all-downloaded-storm-ids
+            :when (contains? assigned-storm-ids storm-id)]
+        (when-not (required-topo-files-exist? conf storm-id)
+          (log-debug "Files not present in topology directory")
+          (rm-topo-files conf storm-id localizer false)
+          storm-id)))))
+
 (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
   (fn this []
     (let [conf (:conf supervisor)
-          download-lock (:download-lock supervisor)
           storm-cluster-state (:storm-cluster-state supervisor)
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
@@ -468,7 +532,7 @@
            versions :versions}
           (assignments-snapshot storm-cluster-state sync-callback assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
-          downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+          all-downloaded-storm-ids (set (read-downloaded-storm-ids conf))
           existing-assignment (ls-local-assignments local-state)
           all-assignment (read-assignments assignments-snapshot
                                            (:assignment-id supervisor)
@@ -476,14 +540,20 @@
                                            (:sync-retry supervisor))
           new-assignment (->> all-assignment
                               (filter-key #(.confirmAssigned isupervisor %)))
-          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
+          localizer (:localizer supervisor)
+          checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids))
+          downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)]
+
       (log-debug "Synchronizing supervisor")
       (log-debug "Storm code map: " storm-code-map)
-      (log-debug "Downloaded storm ids: " downloaded-storm-ids)
       (log-debug "All assignment: " all-assignment)
       (log-debug "New assignment: " new-assignment)
-      (log-debug "Storm Ids Profiler Actions" storm-id->profiler-actions)
-
+      (log-debug "Assigned Storm Ids " assigned-storm-ids)
+      (log-debug "All Downloaded Ids " all-downloaded-storm-ids)
+      (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids)
+      (log-debug "Downloaded Ids " downloaded-storm-ids)
+      (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions)
       ;; download code first
       ;; This might take awhile
       ;;   - should this be done separately from usual monitoring?
@@ -491,7 +561,9 @@
       (doseq [[storm-id master-code-dir] storm-code-map]
         (when (and (not (downloaded-storm-ids storm-id))
                    (assigned-storm-ids storm-id))
-          (download-storm-code conf storm-id master-code-dir supervisor download-lock)))
+          (log-message "Downloading code for storm id " storm-id)
+          (download-storm-code conf storm-id master-code-dir localizer)
+          (log-message "Finished downloading code for storm id " storm-id)))
 
       (log-debug "Writing new assignment "
                  (pr-str new-assignment))
@@ -510,22 +582,52 @@
       ;; synchronize-supervisor doesn't try to launch workers for which the
       ;; resources don't exist
       (if on-windows? (shutdown-disallowed-workers supervisor))
-      (doseq [storm-id downloaded-storm-ids]
+      (doseq [storm-id all-downloaded-storm-ids]
         (when-not (storm-code-map storm-id)
           (log-message "Removing code for storm id "
                        storm-id)
-          (try
-            (rmr (supervisor-stormdist-root conf storm-id))
-            (catch Exception e (log-message (.getMessage e))))
-          ))
-      (.add processes-event-manager sync-processes)
-      )))
+          (rm-topo-files conf storm-id localizer true)))
+      (.add processes-event-manager sync-processes))))
 
 (defn mk-supervisor-capacities
   [conf]
   {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
    Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
 
+(defn update-blobs-for-topology!
+  "Update each blob listed in the topology configuration if the latest version of the blob
+   has not been downloaded."
+  [conf storm-id localizer]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        localresources (blobstore-map-to-localresources blobstore-map)]
+    (try
+      (.updateBlobs localizer localresources user)
+      (catch AuthorizationException authExp
+        (log-error authExp))
+      (catch KeyNotFoundException knf
+        (log-error knf)))))
+
+(defn update-blobs-for-all-topologies-fn
+  "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned
+  to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically
+  by a timer, created elsewhere."
+  [supervisor]
+  (fn []
+    (try
+      (let [conf (:conf supervisor)
+            downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+            new-assignment @(:curr-assignment supervisor)
+            assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+        (doseq [topology-id downloaded-storm-ids]
+          (let [storm-root (supervisor-stormdist-root conf topology-id)]
+            (when (assigned-storm-ids topology-id)
+              (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root)
+              (update-blobs-for-topology! conf topology-id (:localizer supervisor))))))
+      (catch Exception e
+        (log-error e "Error updating blobs, will retry again later")))))
+
 (defn jvm-cmd [cmd]
   (let [java-home (.get (System/getenv) "JAVA_HOME")]
     (if (nil? java-home)
@@ -650,6 +752,8 @@
         [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
         sync-processes (partial sync-processes supervisor)
         synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
+        synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
+        downloaded-storm-ids (set (read-downloaded-storm-ids conf))
         run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor)
         heartbeat-fn (fn [] (.supervisor-heartbeat!
                                (:storm-cluster-state supervisor)
@@ -671,6 +775,12 @@
                         0
                         (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                         heartbeat-fn)
+    (doseq [storm-id downloaded-storm-ids]
+      (add-blob-references (:localizer supervisor) storm-id
+        conf))
+    ;; do this after adding the references so we don't try to clean things being used
+    (.startCleaner (:localizer supervisor))
+
     (when (conf SUPERVISOR-ENABLE)
       ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
       ;; to date even if callbacks don't all work exactly right
@@ -679,6 +789,13 @@
                           0
                           (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
                           (fn [] (.add processes-event-manager sync-processes)))
+
+      ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
+      (schedule-recurring (:blob-update-timer supervisor)
+                          30
+                          30
+                          (fn [] (.add event-manager synchronize-blobs-fn)))
+
       (schedule-recurring (:event-timer supervisor)
                           (* 60 5)
                           (* 60 5)
@@ -689,6 +806,7 @@
                                        (doseq [id ids]
                                          (shutdown-worker supervisor id))
                                        (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+
       ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
       (schedule-recurring (:event-timer supervisor)
                           30
@@ -702,8 +820,10 @@
                (reset! (:active supervisor) false)
                (cancel-timer (:heartbeat-timer supervisor))
                (cancel-timer (:event-timer supervisor))
+               (cancel-timer (:blob-update-timer supervisor))
                (.shutdown event-manager)
                (.shutdown processes-event-manager)
+               (.shutdown (:localizer supervisor))
                (.disconnect (:storm-cluster-state supervisor)))
      SupervisorDaemon
      (get-conf [this]
@@ -728,29 +848,92 @@
   (.shutdown supervisor)
   )
 
-(defn setup-storm-code-dir [conf storm-conf dir]
+(defn setup-storm-code-dir
+  [conf storm-conf dir]
  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
   (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir))))
 
+(defn setup-blob-permission
+  [conf storm-conf path]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+    (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path))))
+
+(defn download-blobs-for-topology!
+  "Download all blobs listed in the topology configuration for a given topology."
+  [conf stormconf-path localizer tmproot]
+  (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        topo-name (storm-conf TOPOLOGY-NAME)
+        user-dir (.getLocalUserFileCacheDir localizer user)
+        localresources (blobstore-map-to-localresources blobstore-map)]
+    (when localresources
+      (when-not (.exists user-dir)
+        (FileUtils/forceMkdir user-dir)
+        (setup-blob-permission conf storm-conf (.toString user-dir)))
+      (try
+        (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)]
+          (setup-blob-permission conf storm-conf (.toString user-dir))
+          (doseq [local-rsrc localized-resources]
+            (let [rsrc-file-path (File. (.getFilePath local-rsrc))
+                  key-name (.getName rsrc-file-path)
+                  blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc)))
+                  symlink-name (get-blob-localname (get blobstore-map key-name) key-name)]
+              (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name
+                blob-symlink-target-name))))
+        (catch AuthorizationException authExp
+          (log-error authExp))
+        (catch KeyNotFoundException knf
+          (log-error knf))))))
+
+(defn get-blob-file-names
+  [blobstore-map]
+  (if blobstore-map
+    (for [[k, data] blobstore-map]
+      (get-blob-localname data k))))
+
+(defn download-blobs-for-topology-succeed?
+  "Assert if all blobs are downloaded for the given topology"
+  [stormconf-path target-dir]
+  (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        file-names (get-blob-file-names blobstore-map)]
+    (if-not (empty? file-names)
+      (every? #(Utils/checkFileExists target-dir %) file-names)
+      true)))
+
 ;; distributed implementation
 (defmethod download-storm-code
-    :distributed [conf storm-id master-code-dir supervisor download-lock]
-    ;; Downloading to permanent location is atomic
-    (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
-          stormroot (supervisor-stormdist-root conf storm-id)
-          master-meta-file-path (master-storm-metafile-path master-code-dir)
-          supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
-      (locking download-lock
-        (log-message "Downloading code for storm id " storm-id " from " master-code-dir)
-        (FileUtils/forceMkdir (File. tmproot))
-        (Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path)
-        (if (:code-distributor supervisor)
-          (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path)))
-        (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
-        (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot)))
-        (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
-        (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
-        (log-message "Finished downloading code for storm id " storm-id " from " master-code-dir))))
+  :distributed [conf storm-id master-code-dir localizer]
+  ;; Downloading to permanent location is atomic
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+        stormroot (supervisor-stormdist-root conf storm-id)
+        blobstore (Utils/getClientBlobStoreForSupervisor conf)]
+    (FileUtils/forceMkdir (File. tmproot))
+    (if-not on-windows?
+      (Utils/restrictPermissions tmproot)
+      (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+        (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
+    (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id)
+      (supervisor-stormjar-path tmproot) blobstore)
+    (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id)
+      (supervisor-stormcode-path tmproot) blobstore)
+    (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id)
+      (supervisor-stormconf-path tmproot) blobstore)
+    (.shutdown blobstore)
+    (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+    (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
+      tmproot)
+    (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
+      (do
+        (log-message "Successfully downloaded blob resources for storm-id " storm-id)
+        (FileUtils/forceMkdir (File. stormroot))
+        (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot))
+          (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE)))
+        (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot))
+      (do
+        (log-message "Failed to download blob resources for storm-id " storm-id)
+        (rmr tmproot)))))
 
 (defn write-log-metadata-to-yaml-file! [storm-id port data conf]
   (let [file (get-log-metadata-file conf storm-id port)]
@@ -781,11 +964,6 @@
                                              (storm-conf TOPOLOGY-USERS)))))}]
     (write-log-metadata-to-yaml-file! storm-id port data conf)))
 
-(defmethod mk-code-distributor :distributed [conf]
-  (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
-    (.prepare code-distributor conf)
-    code-distributor))
-
 (defn jlp [stormroot conf]
   (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
         os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
@@ -811,6 +989,21 @@
       :else (-> value sub-fn (clojure.string/split #"\s+")))))
 
 
+(defn create-blobstore-links
+  "Create symlinks in worker launch directory for all blobs"
+  [conf storm-id worker-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+        storm-conf (read-supervisor-storm-conf conf storm-id)
+        workerroot (worker-root conf worker-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        blob-file-names (get-blob-file-names blobstore-map)
+        resource-file-names (cons RESOURCES-SUBDIR blob-file-names)]
+    (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+      storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names))
+    (create-symlink! workerroot stormroot RESOURCES-SUBDIR)
+    (doseq [file-name blob-file-names]
+      (create-symlink! workerroot stormroot file-name file-name))))
+
 (defn create-artifacts-link
   "Create a symlink from workder directory to its port artifacts directory"
   [conf storm-id port worker-id]
@@ -913,6 +1106,7 @@
                        (add-dead-worker worker-id))
             worker-dir (worker-root conf worker-id)]
         (remove-dead-worker worker-id)
+        (create-blobstore-links conf storm-id worker-id)
         (if run-worker-as-user
           (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))
           (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)))
@@ -927,31 +1121,31 @@
        first ))
 
 (defmethod download-storm-code
-    :local [conf storm-id master-code-dir supervisor download-lock]
-    (let [stormroot (supervisor-stormdist-root conf storm-id)]
-      (locking download-lock
-            (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
-            (let [classloader (.getContextClassLoader (Thread/currentThread))
-                  resources-jar (resources-jar)
-                  url (.getResource classloader RESOURCES-SUBDIR)
-                  target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
-              (cond
-               resources-jar
-               (do
-                 (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
-                 (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
-               url
-               (do
-                 (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
-                 (if (= (.getProtocol url) "jar" )
-                   (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
-                   (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
-                 )
-               )
-              )
-            )))
-
-(defmethod mk-code-distributor :local [conf] nil)
+  :local [conf storm-id master-code-dir localizer]
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+        stormroot (supervisor-stormdist-root conf storm-id)
+        blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
+    (try
+      (FileUtils/forceMkdir (File. tmproot))
+      (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+      (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+      (finally
+        (.shutdown blob-store)))
+    (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+    (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+    (let [classloader (.getContextClassLoader (Thread/currentThread))
+          resources-jar (resources-jar)
+          url (.getResource classloader RESOURCES-SUBDIR)
+          target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+      (cond
+        resources-jar
+        (do
+          (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+          (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+        url
+        (do
+          (log-message "Copying resources at " (str url) " to " target-dir)
+          (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir)))))))
 
 (defmethod launch-worker
     :local [supervisor storm-id port worker-id mem-onheap]

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 2c98b07..c552519 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -126,7 +126,8 @@
                             ZMQ-LINGER-MILLIS 0
                             TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
                             TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
-                            STORM-CLUSTER-MODE "local"}
+                            STORM-CLUSTER-MODE "local"
+                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
                            (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                              {STORM-ZOOKEEPER-PORT zk-port
                               STORM-ZOOKEEPER-SERVERS ["localhost"]})
@@ -628,7 +629,7 @@
           track-id (-> tracked-topology :cluster ::track-id)
           waiting? (fn []
                      (or (not= target (global-amt track-id "spout-emitted"))
-                         (not= (global-amt track-id "transferred")                                 
+                         (not= (global-amt track-id "transferred")
                                (global-amt track-id "processed"))))]
       (while-timeout timeout-ms (waiting?)
                      ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 9b22e70..cbe5bf9 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -22,6 +22,8 @@
   (:import [backtype.storm Config])
   (:import [backtype.storm.utils Time Container ClojureTimerTask Utils
             MutableObject MutableInt])
+  (:import [backtype.storm.security.auth NimbusPrincipal])
+  (:import [javax.security.auth Subject])
   (:import [java.util UUID Random ArrayList List Collections])
   (:import [java.util.zip ZipFile])
   (:import [java.util.concurrent.locks ReentrantReadWriteLock])
@@ -1099,7 +1101,19 @@
     (assoc coll k (apply str (repeat (count (coll k)) "#")))
     coll))
 
-(defn log-thrift-access [request-id remoteAddress principal operation]
+(defn log-thrift-access
+  [request-id remoteAddress principal operation]
   (doto
     (ThriftAccessLogger.)
     (.log (str "Request ID: " request-id " access from: " remoteAddress " principal: " principal " operation: " operation))))
+
+(def DISALLOWED-KEY-NAME-STRS #{"/" "." ":" "\\"})
+
+(defn validate-key-name!
+  [name]
+  (if (some #(.contains name %) DISALLOWED-KEY-NAME-STRS)
+    (throw (RuntimeException.
+             (str "Key name cannot contain any of the following: " (pr-str DISALLOWED-KEY-NAME-STRS))))
+    (if (clojure.string/blank? name)
+      (throw (RuntimeException.
+               ("Key name cannot be blank"))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj
index 26def33..c91ffa4 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -114,6 +114,7 @@
       (try-cause  (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path)))
                   (catch KeeperException$NoNodeException e
                     ;; do nothing
+                    (log-message "exception" e)
                   )
                   (catch Exception e (throw (wrap-in-runtime e)))))))
 
@@ -129,7 +130,6 @@
           ))
       )))
 
-
 (defn sync-path
   [^CuratorFramework zk ^String path]
   (try
@@ -186,6 +186,19 @@
       (.. zk (getChildren) (forPath (normalize-path path))))
     (catch Exception e (throw (wrap-in-runtime e)))))
 
+(defn delete-node-blobstore
+  "Deletes the state inside the zookeeper for a key, for which the
+   contents of the key starts with nimbus host port information"
+  [^CuratorFramework zk ^String parent-path ^String host-port-info]
+  (let [parent-path (normalize-path parent-path)
+        child-path-list (if (exists-node? zk parent-path false)
+                          (into [] (get-children zk parent-path false))
+                          [])]
+    (doseq [child child-path-list]
+      (when (.startsWith child host-port-info)
+        (log-debug "delete-node " "child" child)
+        (delete-node zk (str parent-path "/" child))))))
+
 (defn set-data
   [^CuratorFramework zk ^String path ^bytes data]
   (try
@@ -232,22 +245,10 @@
 (defn leader-latch-listener-impl
   "Leader latch listener that will be invoked when we either gain or lose leadership"
   [conf zk leader-latch]
-  (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
-        STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")]
+  (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))]
     (reify LeaderLatchListener
       (^void isLeader[this]
-        (log-message (str hostname " gained leadership, checking if it has all the topology code locally."))
-        (let [active-topology-ids (set (get-children zk STORMS-ROOT false))
-              local-topology-ids (set (.list (File. (master-stormdist-root conf))))
-              diff-topology (first (set-delta active-topology-ids local-topology-ids))]
-        (log-message "active-topology-ids [" (clojure.string/join "," active-topology-ids)
-                          "] local-topology-ids [" (clojure.string/join "," local-topology-ids)
-                          "] diff-topology [" (clojure.string/join "," diff-topology) "]")
-        (if (empty? diff-topology)
-          (log-message "Accepting leadership, all active topology found localy.")
-          (do
-            (log-message "code for all active topologies not available locally, giving up leadership.")
-            (.close leader-latch)))))
+        (log-message (str hostname " gained leadership")))
       (^void notLeader[this]
         (log-message (str hostname " lost leadership."))))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 89422f6..b663dcb 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1056,6 +1056,122 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
 
     /**
+     * What blobstore implementation the supervisor should use.
+     */
+    @isString
+    public static final String SUPERVISOR_BLOBSTORE = "supervisor.blobstore.class";
+
+    /**
+     * The distributed cache target size in MB. This is a soft limit to the size of the distributed
+     * cache contents.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB = "supervisor.localizer.cache.target.size.mb";
+
+    /**
+     * The distributed cache cleanup interval. Controls how often it scans to attempt to cleanup
+     * anything over the cache target size.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = "supervisor.localizer.cleanup.interval.ms";
+
+    /**
+     * What blobstore implementation the storm client should use.
+     */
+    @isString
+    public static final String CLIENT_BLOBSTORE = "client.blobstore.class";
+
+    /**
+     * What blobstore download parallelism the supervisor should use.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT = "supervisor.blobstore.download.thread.count";
+
+    /**
+     * Maximum number of retries a supervisor is allowed to make for downloading a blob.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES = "supervisor.blobstore.download.max_retries";
+
+    /**
+     * The blobstore super user has all read/write/admin permissions to all blobs - user running
+     * the blobstore.
+     */
+    @isString
+    public static final String BLOBSTORE_SUPERUSER = "blobstore.superuser";
+
+    /**
+     * What directory to use for the blobstore. The directory is expected to be an
+     * absolute path when using HDFS blobstore, for LocalFsBlobStore it could be either
+     * absolute or relative.
+     */
+    @isString
+    public static final String BLOBSTORE_DIR = "blobstore.dir";
+
+    /**
+     * What buffer size to use for the blobstore uploads.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES = "storm.blobstore.inputstream.buffer.size.bytes";
+
+    /**
+     * Enable the blobstore cleaner. Certain blobstores may only want to run the cleaner
+     * on one daemon. Currently Nimbus handles setting this.
+     */
+    @isBoolean
+    public static final String BLOBSTORE_CLEANUP_ENABLE = "blobstore.cleanup.enable";
+
+    /**
+     * principal for nimbus/supervisor to use to access secure hdfs for the blobstore.
+     */
+    @isString
+    public static final String BLOBSTORE_HDFS_PRINCIPAL = "blobstore.hdfs.principal";
+
+    /**
+     * keytab for nimbus/supervisor to use to access secure hdfs for the blobstore.
+     */
+    @isString
+    public static final String BLOBSTORE_HDFS_KEYTAB = "blobstore.hdfs.keytab";
+
+    /**
+     *  Set replication factor for a blob in HDFS Blobstore Implementation
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String STORM_BLOBSTORE_REPLICATION_FACTOR = "storm.blobstore.replication.factor";
+
+    /**
+     * What blobstore implementation nimbus should use.
+     */
+    @isString
+    public static final String NIMBUS_BLOBSTORE = "nimbus.blobstore.class";
+
+    /**
+     * During operations with the blob store, via master, how long a connection
+     * is idle before nimbus considers it dead and drops the session and any
+     * associated connections.
+     */
+    @isPositiveNumber
+    @isInteger
+    public static final String NIMBUS_BLOBSTORE_EXPIRATION_SECS = "nimbus.blobstore.expiration.secs";
+
+    /**
+     * A map with blobstore keys mapped to each filename the worker will have access to in the
+     * launch directory to the blob by local file name and uncompress flag. Both localname and
+     * uncompress flag are optional. It uses the key is localname is not specified. Each topology
+     * will have different map of blobs.  Example: topology.blobstore.map: {"blobstorekey" :
+     * {"localname": "myblob", "uncompress": false}, "blobstorearchivekey" :
+     * {"localname": "myarchive", "uncompress": true}}
+     */
+    @CustomValidator(validatorClass = MapOfStringToMapOfStringToObjectValidator.class)
+    public static final String TOPOLOGY_BLOBSTORE_MAP = "topology.blobstore.map";
+
+    /**
      * A number representing the maximum number of workers any single topology can acquire.
      */
     @isInteger
@@ -1847,13 +1963,6 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS="topology.disruptor.batch.timeout.millis";
 
     /**
-     * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
-     * distribution.
-     */
-    @isString
-    public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
-
-    /**
      * Minimum number of nimbus hosts where the code must be replicated before leader nimbus
      * is allowed to perform topology activation tasks like setting up heartbeats/assignments
      * and marking the topology as active. default is 0.

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
new file mode 100644
index 0000000..f35b7a7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An output stream where all of the data is committed on close,
+ * or can be canceled with cancel.
+ */
+public abstract class AtomicOutputStream extends OutputStream {
+    /**
+     * Cancel all of the writes associated with this stream and close it.
+     */ 
+    public abstract void cancel() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
new file mode 100644
index 0000000..53cfa15
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+public class BlobKeySequenceInfo {
+    private String nimbusHostPort;
+    private String sequenceNumber;
+
+    public void setNimbusHostPort(String nimbusHostPort) {
+     this.nimbusHostPort = nimbusHostPort;
+    }
+
+    public void setSequenceNumber(String sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public String getNimbusHostPort() {
+        return nimbusHostPort;
+    }
+
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
new file mode 100644
index 0000000..a714b76
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import backtype.storm.nimbus.NimbusInfo;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line also allows us to update and delete blobs.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+    private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
+    private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$");
+    protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+    /**
+     * Allows us to initialize the blob store
+     * @param conf The storm configuration
+     * @param baseDir The directory path to store the blobs
+     * @param nimbusInfo Contains the nimbus host, port and leadership information.
+     */
+    public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo);
+
+    /**
+     * Creates the blob.
+     * @param key Key for the blob.
+     * @param meta Metadata which contains the acls information
+     * @param who Is the subject creating the blob.
+     * @return AtomicOutputStream returns a stream into which the data
+     * can be written.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     */
+    public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException;
+
+    /**
+     * Updates the blob data.
+     * @param key Key for the blob.
+     * @param who Is the subject having the write privilege for the blob.
+     * @return AtomicOutputStream returns a stream into which the data
+     * can be written.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Gets the current version of metadata for a blob
+     * to be viewed by the user or downloaded by the supervisor.
+     * @param key Key for the blob.
+     * @param who Is the subject having the read privilege for the blob.
+     * @return AtomicOutputStream returns a stream into which the data
+     * can be written.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Sets the metadata with renewed acls for the blob.
+     * @param key Key for the blob.
+     * @param meta Metadata which contains the updated
+     * acls information.
+     * @param who Is the subject having the write privilege for the blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Deletes the blob data and metadata.
+     * @param key Key for the blob.
+     * @param who Is the subject having write privilege for the blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Gets the InputStream to read the blob details
+     * @param key Key for the blob.
+     * @param who Is the subject having the read privilege for the blob.
+     * @return InputStreamWithMeta has the additional
+     * file length and version information.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Returns an iterator with all the list of
+     * keys currently available on the blob store.
+     * @return Iterator<String>
+     */
+    public abstract Iterator<String> listKeys();
+
+    /**
+     * Gets the replication factor of the blob.
+     * @param key Key for the blob.
+     * @param who Is the subject having the read privilege for the blob.
+     * @return BlobReplication object containing the
+     * replication factor for the blob.
+     * @throws Exception
+     */
+    public abstract int getBlobReplication(String key, Subject who) throws Exception;
+
+    /**
+     * Modifies the replication factor of the blob.
+     * @param key Key for the blob.
+     * @param replication The replication factor the
+     * blob has to be set.
+     * @param who Is the subject having the update privilege for the blob
+     * @return BlobReplication object containing the
+     * updated replication factor for the blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     * @throws IOException
+     */
+    public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
+
+    /**
+     * Filters keys based on the KeyFilter
+     * passed as the argument.
+     * @param filter KeyFilter
+     * @param <R> Type
+     * @return Set of filtered keys
+     */
+    public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
+        Set<R> ret = new HashSet<R>();
+        Iterator<String> keys = listKeys();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            R filtered = filter.filter(key);
+            if (filtered != null) {
+                ret.add(filtered);
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Validates key checking for potentially harmful patterns
+     * @param key Key for the blob.
+     */
+    public static final void validateKey(String key) {
+        if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) {
+            LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
+            throw new IllegalArgumentException(key+" does not appear to be a valid blob key");
+        }
+    }
+
+    /**
+     * Wrapper called to create the blob which contains
+     * the byte data
+     * @param key Key for the blob.
+     * @param data Byte data that needs to be uploaded.
+     * @param meta Metadata which contains the acls information
+     * @param who Is the subject creating the blob.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     * @throws IOException
+     */
+    public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+        AtomicOutputStream out = null;
+        try {
+            out = createBlob(key, meta, who);
+            out.write(data);
+            out.close();
+            out = null;
+        } finally {
+            if (out != null) {
+                out.cancel();
+            }
+        }
+    }
+
+    /**
+     * Wrapper called to create the blob which contains
+     * the byte data
+     * @param key Key for the blob.
+     * @param in InputStream from which the data is read to be
+     * written as a part of the blob.
+     * @param meta Metadata which contains the acls information
+     * @param who Is the subject creating the blob.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     * @throws IOException
+     */
+    public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+        AtomicOutputStream out = null;
+        try {
+            out = createBlob(key, meta, who);
+            byte[] buffer = new byte[2048];
+            int len = 0;
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            out.close();
+        } catch (AuthorizationException | IOException | RuntimeException e) {
+            if (out !=null) {
+                out.cancel();
+            }
+        } finally {
+            in.close();
+        }
+    }
+
+    /**
+     * Reads the blob from the blob store
+     * and writes it into the output stream.
+     * @param key Key for the blob.
+     * @param out Output stream
+     * @param who Is the subject having read
+     * privilege for the blob.
+     * @throws IOException
+     * @throws KeyNotFoundException
+     * @throws AuthorizationException
+     */
+    public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+        InputStreamWithMeta in = getBlob(key, who);
+        if (in == null) {
+            throw new IOException("Could not find " + key);
+        }
+        byte[] buffer = new byte[2048];
+        int len = 0;
+        try{
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+        } finally {
+            in.close();
+            out.flush();
+        }
+    }
+
+    /**
+     * Wrapper around readBlobTo which
+     * returns a ByteArray output stream.
+     * @param key  Key for the blob.
+     * @param who Is the subject having
+     * the read privilege for the blob.
+     * @return ByteArrayOutputStream
+     * @throws IOException
+     * @throws KeyNotFoundException
+     * @throws AuthorizationException
+     */
+    public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        readBlobTo(key, out, who);
+        return out.toByteArray();
+    }
+
+    /**
+     * Output stream implementation used for reading the
+     * metadata and data information.
+     */
+    protected class BlobStoreFileOutputStream extends AtomicOutputStream {
+        private BlobStoreFile part;
+        private OutputStream out;
+
+        public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException {
+            this.part = part;
+            this.out = part.getOutputStream();
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                //close means commit
+                out.close();
+                part.commit();
+            } catch (IOException | RuntimeException e) {
+                cancel();
+                throw e;
+            }
+        }
+
+        @Override
+        public void cancel() throws IOException {
+            try {
+                out.close();
+            } finally {
+                part.cancel();
+            }
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(byte []b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(byte []b, int offset, int len) throws IOException {
+            out.write(b, offset, len);
+        }
+    }
+
+    /**
+     * Input stream implementation used for writing
+     * both the metadata containing the acl information
+     * and the blob data.
+     */
+    protected class BlobStoreFileInputStream extends InputStreamWithMeta {
+        private BlobStoreFile part;
+        private InputStream in;
+
+        public BlobStoreFileInputStream(BlobStoreFile part) throws IOException {
+            this.part = part;
+            this.in = part.getInputStream();
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return part.getModTime();
+        }
+
+        @Override
+        public int read() throws IOException {
+            return in.read();
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            return in.read(b, off, len);
+        }
+
+        @Override
+        public int read(byte[] b) throws IOException {
+            return in.read(b);
+        }
+
+        @Override
+        public int available() throws IOException {
+            return in.available();
+        }
+
+        @Override
+        public long getFileLength() throws IOException {
+            return part.getFileLength();
+        }
+    }
+
+    /**
+     * Blob store implements its own version of iterator
+     * to list the blobs
+     */
+    public static class KeyTranslationIterator implements Iterator<String> {
+        private Iterator<String> it = null;
+        private String next = null;
+        private String prefix = null;
+
+        public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException {
+            this.it = it;
+            this.prefix = prefix;
+            primeNext();
+        }
+
+        private void primeNext() {
+            next = null;
+            while (it.hasNext()) {
+                String tmp = it.next();
+                if (tmp.startsWith(prefix)) {
+                    next = tmp.substring(prefix.length());
+                    return;
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String current = next;
+            primeNext();
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
new file mode 100644
index 0000000..c0c4e5c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+    public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class);
+    private final IPrincipalToLocal _ptol;
+
+    public static final int READ = 0x01;
+    public static final int WRITE = 0x02;
+    public static final int ADMIN = 0x04;
+    public static final List<AccessControl> WORLD_EVERYTHING =
+            Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+    public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>();
+    private Set<String> _supervisors;
+    private Set<String> _admins;
+
+    public BlobStoreAclHandler(Map conf) {
+        _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+        _supervisors = new HashSet<String>();
+        _admins = new HashSet<String>();
+        if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+            _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+        }
+        if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+            _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS));
+        }
+    }
+
+    private static AccessControlType parseACLType(String type) {
+        if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+            return AccessControlType.OTHER;
+        } else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) {
+            return AccessControlType.USER;
+        }
+        throw new IllegalArgumentException(type+" is not a valid access control type");
+    }
+
+    private static int parseAccess(String access) {
+        int ret = 0;
+        for (char c: access.toCharArray()) {
+            if ('r' == c) {
+                ret = ret | READ;
+            } else if ('w' == c) {
+                ret = ret | WRITE;
+            } else if ('a' == c) {
+                ret = ret | ADMIN;
+            } else if ('-' == c) {
+                //ignored
+            } else {
+                throw new IllegalArgumentException("");
+            }
+        }
+        return ret;
+    }
+
+    public static AccessControl parseAccessControl(String str) {
+        String[] parts = str.split(":");
+        String type = "other";
+        String name = "";
+        String access = "-";
+        if (parts.length > 3) {
+            throw new IllegalArgumentException("Don't know how to parse "+str+" into an ACL value");
+        } else if (parts.length == 1) {
+            type = "other";
+            name = "";
+            access = parts[0];
+        } else if (parts.length == 2) {
+            type = "user";
+            name = parts[0];
+            access = parts[1];
+        } else if (parts.length == 3) {
+            type = parts[0];
+            name = parts[1];
+            access = parts[2];
+        }
+        AccessControl ret = new AccessControl();
+        ret.set_type(parseACLType(type));
+        ret.set_name(name);
+        ret.set_access(parseAccess(access));
+        return ret;
+    }
+
+    private static String accessToString(int access) {
+        StringBuilder ret = new StringBuilder();
+        ret.append(((access & READ) > 0) ? "r" : "-");
+        ret.append(((access & WRITE) > 0) ? "w" : "-");
+        ret.append(((access & ADMIN) > 0) ? "a" : "-");
+        return ret.toString();
+    }
+
+    public static String accessControlToString(AccessControl ac) {
+        StringBuilder ret = new StringBuilder();
+        switch(ac.get_type()) {
+            case OTHER:
+                ret.append("o");
+                break;
+            case USER:
+                ret.append("u");
+                break;
+            default:
+                throw new IllegalArgumentException("Don't know what a type of "+ac.get_type()+" means ");
+        }
+        ret.append(":");
+        if (ac.is_set_name()) {
+            ret.append(ac.get_name());
+        }
+        ret.append(":");
+        ret.append(accessToString(ac.get_access()));
+        return ret.toString();
+    }
+
+    public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException {
+        Set<String> aclUsers = new HashSet<>();
+        List<String> duplicateUsers = new ArrayList<>();
+        for (AccessControl acl : acls) {
+            String aclUser = acl.get_name();
+            if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) {
+                LOG.error("'{}' user can't appear more than once in the ACLs", aclUser);
+                duplicateUsers.add(aclUser);
+            }
+        }
+        if (duplicateUsers.size() > 0) {
+            String errorMessage  = "user " + Arrays.toString(duplicateUsers.toArray())
+                    + " can't appear more than once in the ACLs for key [" + key +"].";
+            throw new AuthorizationException(errorMessage);
+        }
+    }
+
+    private Set<String> constructUserFromPrincipals(Subject who) {
+        Set<String> user = new HashSet<String>();
+        if (who != null) {
+            for (Principal p : who.getPrincipals()) {
+                user.add(_ptol.toLocal(p));
+            }
+        }
+        return user;
+    }
+
+    private boolean isAdmin(Subject who) {
+        Set<String> user = constructUserFromPrincipals(who);
+        for (String u : user) {
+            if (_admins.contains(u)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isReadOperation(int operation) {
+        if (operation == 1) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isSupervisor(Subject who, int operation) {
+        Set<String> user = constructUserFromPrincipals(who);
+        if (isReadOperation(operation)) {
+            for (String u : user) {
+                if (_supervisors.contains(u)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isNimbus(Subject who) {
+        Set<Principal> principals;
+        boolean isNimbusInstance = false;
+        if (who != null) {
+            principals = who.getPrincipals();
+            for (Principal principal : principals) {
+                if (principal instanceof NimbusPrincipal) {
+                    isNimbusInstance = true;
+                }
+            }
+        }
+        return isNimbusInstance;
+    }
+
+    public boolean checkForValidUsers(Subject who, int mask) {
+        return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask);
+    }
+
+    /**
+     * The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN
+     */
+    public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException {
+        hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key);
+    }
+
+    /**
+     * Validates if the user has any of the permissions
+     * mentioned in the mask.
+     * @param acl ACL for the key.
+     * @param mask mask holds the cumulative value of
+     * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+     * mask = 1 implies READ privilege.
+     * mask = 5 implies READ and ADMIN privileges.
+     * @param who Is the user against whom the permissions
+     * are validated for a key using the ACL and the mask.
+     * @param key Key used to identify the blob.
+     * @throws AuthorizationException
+     */
+    public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+        Set<String> user = constructUserFromPrincipals(who);
+        LOG.debug("user {}", user);
+        if (checkForValidUsers(who, mask)) {
+            return;
+        }
+        for (AccessControl ac : acl) {
+            int allowed = getAllowed(ac, user);
+            LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
+            if ((allowed & mask) > 0) {
+                return;
+            }
+        }
+        throw new AuthorizationException(
+                user + " does not have access to " + key);
+    }
+
+    /**
+     * Validates if the user has at least the set of permissions
+     * mentioned in the mask.
+     * @param acl ACL for the key.
+     * @param mask mask holds the cumulative value of
+     * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+     * mask = 1 implies READ privilege.
+     * mask = 5 implies READ and ADMIN privileges.
+     * @param who Is the user against whom the permissions
+     * are validated for a key using the ACL and the mask.
+     * @param key Key used to identify the blob.
+     * @throws AuthorizationException
+     */
+    public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+        Set<String> user = constructUserFromPrincipals(who);
+        LOG.debug("user {}", user);
+        if (checkForValidUsers(who, mask)) {
+            return;
+        }
+        for (AccessControl ac : acl) {
+            int allowed = getAllowed(ac, user);
+            mask = ~allowed & mask;
+            LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key);
+        }
+        if (mask == 0) {
+            return;
+        }
+        throw new AuthorizationException(
+                user + " does not have " + namedPerms(mask) + " access to " + key);
+    }
+
+    public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) {
+        meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask));
+    }
+
+    private String namedPerms(int mask) {
+        StringBuilder b = new StringBuilder();
+        b.append("[");
+        if ((mask & READ) > 0) {
+            b.append("READ ");
+        }
+        if ((mask & WRITE) > 0) {
+            b.append("WRITE ");
+        }
+        if ((mask & ADMIN) > 0) {
+            b.append("ADMIN ");
+        }
+        b.append("]");
+        return b.toString();
+    }
+
+    private int getAllowed(AccessControl ac, Set<String> users) {
+        switch (ac.get_type()) {
+            case OTHER:
+                return ac.get_access();
+            case USER:
+                if (users.contains(ac.get_name())) {
+                    return ac.get_access();
+                }
+                return 0;
+            default:
+                return 0;
+        }
+    }
+
+    private List<AccessControl> removeBadACLs(List<AccessControl> accessControls) {
+        List<AccessControl> resultAcl = new ArrayList<AccessControl>();
+        for (AccessControl control : accessControls) {
+            if(control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0 )) {
+                LOG.debug("Removing invalid blobstore world ACL " +
+                        BlobStoreAclHandler.accessControlToString(control));
+                continue;
+            }
+            resultAcl.add(control);
+        }
+        return resultAcl;
+    }
+
+    private final List<AccessControl> normalizeSettableACLs(String key, List<AccessControl> acls, Subject who,
+                                                            int opMask) {
+        List<AccessControl> cleanAcls = removeBadACLs(acls);
+        Set<String> userNames = getUserNamesFromSubject(who);
+        for (String user : userNames) {
+            fixACLsForUser(cleanAcls, user, opMask);
+        }
+        if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
+            cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
+            LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls);
+            if (!acls.isEmpty())
+                LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key);
+        }
+        return cleanAcls;
+    }
+
+    private boolean worldEverything(List<AccessControl> acls) {
+        boolean isWorldEverything = false;
+        for (AccessControl acl : acls) {
+            if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ|WRITE|ADMIN)) {
+                isWorldEverything = true;
+                break;
+            }
+        }
+        return isWorldEverything;
+    }
+
+    private void fixACLsForUser(List<AccessControl> acls, String user, int mask) {
+        boolean foundUserACL = false;
+        for (AccessControl control : acls) {
+            if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) {
+                int currentAccess = control.get_access();
+                if ((currentAccess & mask) != mask) {
+                    control.set_access(currentAccess | mask);
+                }
+                foundUserACL = true;
+                break;
+            }
+        }
+        if (!foundUserACL) {
+            AccessControl userACL = new AccessControl();
+            userACL.set_type(AccessControlType.USER);
+            userACL.set_name(user);
+            userACL.set_access(mask);
+            acls.add(userACL);
+        }
+    }
+
+    private Set<String> getUserNamesFromSubject(Subject who) {
+        Set<String> user = new HashSet<String>();
+        if (who != null) {
+            for(Principal p: who.getPrincipals()) {
+                user.add(_ptol.toLocal(p));
+            }
+        }
+        return user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
new file mode 100644
index 0000000..22ccf97
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.generated.SettableBlobMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Pattern;
+
+/**
+ * Provides an base implementation for creating a blobstore based on file backed storage.
+ */
+public abstract class BlobStoreFile {
+    public static final Logger LOG = LoggerFactory.getLogger(BlobStoreFile.class);
+
+    protected static final String TMP_EXT = ".tmp";
+    protected static final Pattern TMP_NAME_PATTERN = Pattern.compile("^\\d+\\" + TMP_EXT + "$");
+    protected static final String BLOBSTORE_DATA_FILE = "data";
+
+    public abstract void delete() throws IOException;
+    public abstract String getKey();
+    public abstract boolean isTmp();
+    public abstract void setMetadata(SettableBlobMeta meta);
+    public abstract SettableBlobMeta getMetadata();
+    public abstract long getModTime() throws IOException;
+    public abstract InputStream getInputStream() throws IOException;
+    public abstract OutputStream getOutputStream() throws IOException;
+    public abstract void commit() throws IOException;
+    public abstract void cancel() throws IOException;
+    public abstract long getFileLength() throws IOException;
+}