You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:04:06 UTC

[11/17] storm git commit: Blobstore API STORM- 876

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0da88b0..f093ce5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,6 +175,7 @@
         <java_jmx.version>0.3.1</java_jmx.version>
         <compojure.version>1.1.3</compojure.version>
         <hiccup.version>0.3.6</hiccup.version>
+        <commons-compress.version>1.4.1</commons-compress.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-lang.version>2.5</commons-lang.version>
         <commons-exec.version>1.1</commons-exec.version>
@@ -355,6 +356,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.commons</groupId>
+                <artifactId>commons-compress</artifactId>
+                <version>${commons-compress.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
                 <artifactId>commons-exec</artifactId>
                 <version>${commons-exec.version}</version>
             </dependency>
@@ -469,7 +475,12 @@
                 <artifactId>curator-client</artifactId>
                 <version>${curator.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-test</artifactId>
+                <version>${curator.version}</version>
+                <scope>test</scope>
+            </dependency>
             <dependency>
                 <groupId>com.googlecode.json-simple</groupId>
                 <artifactId>json-simple</artifactId>
@@ -659,6 +670,7 @@
                 <version>${thrift.version}</version>
                 <scope>compile</scope>
             </dependency>
+			<!-- used by examples/storm-starter -->
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>
@@ -818,6 +830,8 @@
                         <exclude>**/.idea/**</exclude>
                         <!-- module specific testing artifacts -->
                         <exclude>**/metastore_db/**</exclude>
+                        <!-- anything written into build should be ignored -->
+                        <exclude>**/build/**</exclude>
 
                         <!-- exclude CHANGELOG, VERSION, AND TODO files -->
                         <exclude>**/CHANGELOG.md</exclude>

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 083cdca..72c4a3a 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -36,6 +36,17 @@
     </properties>
 
     <dependencies>
+        <!--Hadoop Mini Cluster cannot use log4j2 bridge,
+            Surefire has a way to exclude the conflicting log4j API jar
+            from the classpath, classpathDependencyExcludes, but it didn't work in practice.
+            This is here as a work around to place it at the beginning of the classpath
+            even though maven does not officially support ordering of the classpath.-->
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.esotericsoftware.kryo</groupId>
             <artifactId>kryo</artifactId>
@@ -140,6 +151,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
             <artifactId>commons-exec</artifactId>
             <scope>compile</scope>
         </dependency>
@@ -178,7 +193,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
@@ -193,7 +207,20 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>com.googlecode.json-simple</groupId>
             <artifactId>json-simple</artifactId>
@@ -433,6 +460,7 @@
                             <include>org.yaml:snakeyaml</include>
                             <include>org.jgrapht:jgrapht-core</include>
                             <include>org.apache.commons:commons-exec</include>
+                            <include>org.apache.commons:commons-compress</include>
                             <include>commons-io:commons-io</include>
                             <include>commons-codec:commons-codec</include>
                             <include>commons-fileupload:commons-fileupload</include>
@@ -574,6 +602,10 @@
                           <shadedPattern>org.apache.storm.shade.org.apache.commons.io</shadedPattern>
                         </relocation>
                         <relocation>
+                            <pattern>org.apache.commons.compress</pattern>
+                            <shadedPattern>org.apache.storm.shade.org.apache.commons.compress</shadedPattern>
+                        </relocation>
+                        <relocation>
                           <pattern>org.apache.commons.codec</pattern>
                           <shadedPattern>org.apache.storm.shade.org.apache.commons.codec</shadedPattern>
                         </relocation>

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/blobstore.clj b/storm-core/src/clj/backtype/storm/blobstore.clj
new file mode 100644
index 0000000..936f4b5
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/blobstore.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns backtype.storm.blobstore
+  (:import [backtype.storm.utils Utils])
+  (:import [backtype.storm.blobstore ClientBlobStore])
+  (:use [backtype.storm config]))
+
+(defmacro with-configured-blob-client
+  [client-sym & body]
+  `(let [conf# (read-storm-config)
+         ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)]
+     (try
+       ~@body
+       (finally (.shutdown ~client-sym)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 35aa8c8..ebe4955 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -45,18 +45,14 @@
     (log-debug "Creating cluster state: " (.toString clazz))
     (or (.mkState state-instance conf auth-conf acls context)
         nil)))
-  
 
 (defprotocol StormClusterState
   (assignments [this callback])
   (assignment-info [this storm-id callback])
   (assignment-info-with-version [this storm-id callback])
   (assignment-version [this storm-id callback])
-  ;returns topologyIds under /stormroot/code-distributor
-  (code-distributor [this callback])
-  ;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id
-  (code-distributor-info [this storm-id])
-
+  ;returns key information under /storm/blobstore/key
+  (blobstore-info [this blob-key])
   ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data>
   (nimbuses [this])
   ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
@@ -90,9 +86,14 @@
   (update-storm! [this storm-id new-elems])
   (remove-storm-base! [this storm-id])
   (set-assignment! [this storm-id info])
-  ;adds nimbusinfo under /stormroot/code-distributor/storm-id
-  (setup-code-distributor! [this storm-id info])
+  ;; sets up information related to key consisting of nimbus
+  ;; host:port and version info of the blob
+  (setup-blobstore! [this key nimbusInfo versionInfo])
+  (active-keys [this])
+  (blobstore [this callback])
   (remove-storm! [this storm-id])
+  (remove-blobstore-key! [this blob-key])
+  (remove-key-version! [this blob-key])
   (report-error [this storm-id component-id node port error])
   (errors [this storm-id component-id])
   (last-error [this storm-id component-id])
@@ -107,7 +108,9 @@
 (def WORKERBEATS-ROOT "workerbeats")
 (def BACKPRESSURE-ROOT "backpressure")
 (def ERRORS-ROOT "errors")
-(def CODE-DISTRIBUTOR-ROOT "code-distributor")
+(def BLOBSTORE-ROOT "blobstore")
+; Stores the latest update sequence for a blob
+(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber")
 (def NIMBUSES-ROOT "nimbuses")
 (def CREDENTIALS-ROOT "credentials")
 (def LOGCONFIG-ROOT "logconfigs")
@@ -119,7 +122,9 @@
 (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
 (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
 (def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
-(def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT))
+;; Blobstore subtree /storm/blobstore
+(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT))
+(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT))
 (def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
 (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
 (def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
@@ -133,9 +138,13 @@
   [id]
   (str ASSIGNMENTS-SUBTREE "/" id))
 
-(defn code-distributor-path
-  [id]
-  (str CODE-DISTRIBUTOR-SUBTREE "/" id))
+(defn blobstore-path
+  [key]
+  (str BLOBSTORE-SUBTREE "/" key))
+
+(defn blobstore-max-key-sequence-number-path
+  [key]
+  (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key))
 
 (defn nimbus-path
   [id]
@@ -244,7 +253,7 @@
         backpressure-callback (atom {})   ;; we want to reigister a topo directory getChildren callback for all workers of this dir
         assignments-callback (atom nil)
         storm-base-callback (atom {})
-        code-distributor-callback (atom nil)
+        blobstore-callback (atom nil)
         credentials-callback (atom {})
         log-config-callback (atom {})
         state-id (.register
@@ -259,14 +268,14 @@
                                                (issue-map-callback! assignment-version-callback (first args))
                                                (issue-map-callback! assignment-info-with-version-callback (first args))))
                          SUPERVISORS-ROOT (issue-callback! supervisors-callback)
-                         CODE-DISTRIBUTOR-ROOT (issue-callback! code-distributor-callback)
+                         BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore
                          STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                          CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
                          LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args))
                          BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args))
                          ;; this should never happen
                          (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
-    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE
+    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
                LOGCONFIG-SUBTREE]]
       (.mkdirs cluster-state p acls))
     (reify
@@ -299,13 +308,13 @@
           (swap! assignment-version-callback assoc storm-id callback))
         (.get_version cluster-state (assignment-path storm-id) (not-nil? callback)))
 
-      (code-distributor
+      ;; blobstore state
+      (blobstore
         [this callback]
         (when callback
-          (reset! code-distributor-callback callback))
-        (do
-          (.sync_path cluster-state CODE-DISTRIBUTOR-SUBTREE)
-          (.get_children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback))))
+          (reset! blobstore-callback callback))
+        (.sync_path cluster-state BLOBSTORE-SUBTREE)
+        (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback)))
 
       (nimbuses
         [this]
@@ -327,18 +336,29 @@
         
         (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
 
-      (code-distributor-info
-        [this storm-id]
-        (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info))
-          (let [path (code-distributor-path storm-id)]
-            (do
-              (.sync_path cluster-state path)
-              (.get_children cluster-state path false)))))
+      (setup-blobstore!
+        [this key nimbusInfo versionInfo]
+        (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)]
+          (log-message "setup-path" path)
+          (.mkdirs cluster-state (blobstore-path key) acls)
+          ;we delete the node first to ensure the node gets created as part of this session only.
+          (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo))
+          (.set_ephemeral_node cluster-state path nil acls)))
+
+      (blobstore-info
+        [this blob-key]
+        (let [path (blobstore-path blob-key)]
+          (.sync_path cluster-state path)
+          (.get_children cluster-state path false)))
 
       (active-storms
         [this]
         (.get_children cluster-state STORMS-SUBTREE false))
 
+      (active-keys
+        [this]
+        (.get_children cluster-state BLOBSTORE-SUBTREE false))
+
       (heartbeat-storms
         [this]
         (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false))
@@ -526,18 +546,18 @@
         (let [thrift-assignment (thriftify-assignment info)]
           (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
 
-      (setup-code-distributor!
-        [this storm-id nimbusInfo]
-        (let [path (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo))]
-        (.mkdirs cluster-state (code-distributor-path storm-id) acls)
-        ;we delete the node first to ensure the node gets created as part of this session only.
-        (.delete_node cluster-state path)
-        (.set_ephemeral_node cluster-state path nil acls)))
+      (remove-blobstore-key!
+        [this blob-key]
+        (log-debug "removing key" blob-key)
+        (.delete_node cluster-state (blobstore-path blob-key)))
+
+      (remove-key-version!
+        [this blob-key]
+        (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key)))
 
       (remove-storm!
         [this storm-id]
         (.delete_node cluster-state (assignment-path storm-id))
-        (.delete_node cluster-state (code-distributor-path storm-id))
         (.delete_node cluster-state (credentials-path storm-id))
         (.delete_node cluster-state (log-config-path storm-id))
         (.delete_node cluster-state (profiler-config-path storm-id))

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
index ff942db..fa36240 100644
--- a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
+++ b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
@@ -149,6 +149,10 @@
        [this path]
        (zk/sync-path zk-writer path))
 
+      (delete-node-blobstore
+        [this path nimbus-host-port-info]
+        (zk/delete-node-blobstore zk-writer path nimbus-host-port-info))
+
      (close
        [this]
        (reset! active false)

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/blobstore.clj b/storm-core/src/clj/backtype/storm/command/blobstore.clj
new file mode 100644
index 0000000..ae7f919
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/command/blobstore.clj
@@ -0,0 +1,162 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.blobstore
+  (:import [java.io InputStream OutputStream]
+           [backtype.storm.generated SettableBlobMeta AccessControl AuthorizationException
+            KeyNotFoundException]
+           [backtype.storm.blobstore BlobStoreAclHandler])
+  (:use [backtype.storm config]
+        [clojure.string :only [split]]
+        [clojure.tools.cli :only [cli]]
+        [clojure.java.io :only [copy input-stream output-stream]]
+        [backtype.storm blobstore log util])
+  (:gen-class))
+
+(defn update-blob-from-stream
+  "Update a blob in the blob store from an InputStream"
+  [key ^InputStream in]
+  (with-configured-blob-client blobstore
+    (let [out (.updateBlob blobstore key)]
+      (try 
+        (copy in out)
+        (.close out)
+        (catch Exception e
+          (log-message e)
+          (.cancel out)
+          (throw e))))))
+
+(defn create-blob-from-stream
+  "Create a blob in the blob store from an InputStream"
+  [key ^InputStream in ^SettableBlobMeta meta]
+  (with-configured-blob-client blobstore
+    (let [out (.createBlob blobstore key meta)]
+      (try 
+        (copy in out)
+        (.close out)
+        (catch Exception e
+          (.cancel out)
+          (throw e))))))
+
+(defn read-blob
+  "Read a blob in the blob store and write to an OutputStream"
+  [key ^OutputStream out]
+  (with-configured-blob-client blobstore
+    (with-open [in (.getBlob blobstore key)]
+      (copy in out))))
+
+(defn as-access-control
+  "Convert a parameter to an AccessControl object"
+  [param]
+  (BlobStoreAclHandler/parseAccessControl (str param)))
+
+(defn as-acl
+  [param]
+  (map as-access-control (split param #",")))
+
+(defn access-control-str
+  [^AccessControl acl]
+  (BlobStoreAclHandler/accessControlToString acl))
+
+(defn read-cli [args]
+  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+    (if file
+      (with-open [f (output-stream file)]
+        (read-blob key f))
+      (read-blob key System/out))))
+
+(defn update-cli [args]
+  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+    (if file
+      (with-open [f (input-stream file)]
+        (update-blob-from-stream key f))
+      (update-blob-from-stream key System/in))
+    (log-message "Successfully updated " key)))
+
+(defn create-cli [args]
+  (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
+                                                  ["-a" "--acl" :default [] :parse-fn as-acl]
+                                                  ["-r" "--replication-factor" :default -1 :parse-fn parse-int])
+        meta (doto (SettableBlobMeta. acl)
+                   (.set_replication_factor replication-factor))]
+    (validate-key-name! key)
+    (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
+    (if file
+      (with-open [f (input-stream file)]
+        (create-blob-from-stream key f meta))
+      (create-blob-from-stream key System/in meta))
+    (log-message "Successfully created " key)))
+
+(defn delete-cli [args]
+  (with-configured-blob-client blobstore
+    (doseq [key args]
+      (.deleteBlob blobstore key)
+      (log-message "deleted " key))))
+
+(defn list-cli [args]
+  (with-configured-blob-client blobstore
+    (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
+      (doseq [key keys]
+        (try
+          (let [meta (.getBlobMeta blobstore key)
+                version (.get_version meta)
+                acl (.get_acl (.get_settable meta))]
+            (log-message key " " version " " (pr-str (map access-control-str acl))))
+          (catch AuthorizationException ae
+            (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
+          (catch KeyNotFoundException knf
+            (if-not (empty? args) (log-error key " NOT FOUND"))))))))
+
+(defn set-acl-cli [args]
+  (let [[{set-acl :set} [key] _]
+           (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
+    (with-configured-blob-client blobstore
+      (let [meta (.getBlobMeta blobstore key)
+            acl (.get_acl (.get_settable meta))
+            new-acl (if set-acl set-acl acl)
+            new-meta (SettableBlobMeta. new-acl)]
+        (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl)))
+        (.setBlobMeta blobstore key new-meta)))))
+
+(defn rep-cli [args]
+  (let [sub-command (first args)
+        new-args (rest args)]
+    (with-configured-blob-client blobstore
+      (condp = sub-command
+      "--read" (let [key (first new-args)
+                     blob-replication (.getBlobReplication blobstore key)]
+                 (log-message "Current replication factor " blob-replication)
+                 blob-replication)
+      "--update" (let [[{replication-factor :replication-factor} [key] _]
+                        (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])]
+                   (if (nil? replication-factor)
+                     (throw (RuntimeException. (str "Please set the replication factor")))
+                     (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
+                       (log-message "Replication factor is set to " blob-replication)
+                       blob-replication)))
+      :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command")))))))
+
+(defn -main [& args]
+  (let [command (first args)
+        new-args (rest args)]
+    (condp = command
+      "cat" (read-cli new-args)
+      "create" (create-cli new-args)
+      "update" (update-cli new-args)
+      "delete" (delete-cli new-args)
+      "list" (list-cli new-args)
+      "set-acl" (set-acl-cli new-args)
+      "replication" (rep-cli new-args)
+      :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 94b66c3..1617a3b 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -107,6 +107,18 @@
     (FileUtils/forceMkdir (File. ret))
     ret))
 
+(defn master-stormjar-key
+  [topology-id]
+  (str topology-id "-stormjar.jar"))
+
+(defn master-stormcode-key
+  [topology-id]
+  (str topology-id "-stormcode.ser"))
+
+(defn master-stormconf-key
+  [topology-id]
+  (str topology-id "-stormconf.ser"))
+
 (defn master-stormdist-root
   ([conf]
    (str (master-local-dir conf) file-path-separator "stormdist"))
@@ -119,6 +131,10 @@
     (FileUtils/forceMkdir (File. ret))
     ret ))
 
+(defn read-supervisor-storm-conf-given-path
+  [conf stormconf-path]
+  (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))))
+
 (defn master-storm-metafile-path [stormroot ]
   (str stormroot file-path-separator "storm-code-distributor.meta"))
 
@@ -197,7 +213,7 @@
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         conf-path (supervisor-stormconf-path stormroot)
         topology-path (supervisor-stormcode-path stormroot)]
-    (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. conf-path)))))))
+    (read-supervisor-storm-conf-given-path conf conf-path)))
 
 (defn read-supervisor-topology
   [conf storm-id]
@@ -221,7 +237,11 @@
     nil
     )))
 
-  
+(defn get-id-from-blob-key
+  [key]
+  (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)]
+    (nth groups 1)))
+
 (defn set-worker-user! [conf worker-id user]
   (log-message "SET worker-user " worker-id " " user)
   (let [file (worker-user-file conf worker-id)]

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 71d4654..a53ff82 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -15,14 +15,22 @@
 ;; limitations under the License.
 (ns backtype.storm.daemon.nimbus
   (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+  (:import [backtype.storm.generated KeyNotFoundException])
+  (:import [backtype.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
   (:import [org.apache.thrift.exception])
   (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
   (:import [org.apache.commons.io FileUtils])
+  (:import [javax.security.auth Subject])
+  (:import [backtype.storm.security.auth NimbusPrincipal])
   (:import [java.nio ByteBuffer]
            [java.util Collections List HashMap]
            [backtype.storm.generated NimbusSummary])
-  (:import [java.io FileNotFoundException File FileOutputStream])
+  (:import [java.nio ByteBuffer]
+           [java.util Collections List HashMap ArrayList Iterator])
+  (:import [backtype.storm.blobstore AtomicOutputStream BlobStoreAclHandler
+            InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer])
+  (:import [java.io File FileOutputStream FileInputStream])
   (:import [java.net InetAddress])
   (:import [java.nio.channels Channels WritableByteChannel])
   (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
@@ -31,12 +39,12 @@
             Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
   (:import [backtype.storm.nimbus NimbusInfo])
   (:import [backtype.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ThriftTopologyUtils
-            BufferFileInputStream])
+            BufferFileInputStream BufferInputStream])
   (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
             ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
             KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo
-            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice
-            ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
+            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
+            BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
             ProfileRequest ProfileAction NodeInfo])
   (:import [backtype.storm.daemon Shutdownable])
   (:use [backtype.storm util config log timer zookeeper local-state])
@@ -47,6 +55,7 @@
   (:require [clojure.set :as set])
   (:import [backtype.storm.daemon.common StormBase Assignment])
   (:use [backtype.storm.daemon common])
+  (:use [backtype.storm config])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
   (:import [backtype.storm.utils VersionInfo])
   (:require [clj-time.core :as time])
@@ -112,8 +121,7 @@
     scheduler
     ))
 
-(defmulti mk-code-distributor cluster-mode)
-(defmulti sync-code cluster-mode)
+(defmulti blob-sync cluster-mode)
 
 (defnk is-leader [nimbus :throw-exception true]
   (let [leader-elector (:leader-elector nimbus)]
@@ -126,6 +134,25 @@
   [(first ZooDefs$Ids/CREATOR_ALL_ACL) 
    (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
 
+(defn mk-blob-cache-map
+  "Constructs a TimeCacheMap instance with a blob store timeout whose
+  expiration callback invokes cancel on the value held by an expired entry when
+  that value is an AtomicOutputStream and calls close otherwise."
+  [conf]
+  (TimeCacheMap.
+    (int (conf NIMBUS-BLOBSTORE-EXPIRATION-SECS))
+    (reify TimeCacheMap$ExpiredCallback
+      (expire [this id stream]
+        (if (instance? AtomicOutputStream stream)
+          (.cancel stream)
+          (.close stream))))))
+
+(defn mk-bloblist-cache-map
+  "Constructs a TimeCacheMap instance with a blobstore timeout and no callback
+  function."
+  [conf]
+  (TimeCacheMap. (int (conf NIMBUS-BLOBSTORE-EXPIRATION-SECS))))
+
 (defn create-tology-action-notifier [conf]
   (when-not (clojure.string/blank? (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))
     (let [instance (new-instance (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))]
@@ -153,6 +180,10 @@
      :heartbeats-cache (atom {})
      :downloaders (file-cache-map conf)
      :uploaders (file-cache-map conf)
+     :blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+     :blob-downloaders (mk-blob-cache-map conf)
+     :blob-uploaders (mk-blob-cache-map conf)
+     :blob-listers (mk-bloblist-cache-map conf)
      :uptime (uptime-computer)
      :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
      :timer (mk-timer :kill-fn (fn [t]
@@ -161,7 +192,6 @@
                                  ))
      :scheduler (mk-scheduler conf inimbus)
      :leader-elector (zk-leader-elector conf)
-     :code-distributor (mk-code-distributor conf)
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
@@ -175,22 +205,44 @@
 (defn inbox [nimbus]
   (master-inbox (:conf nimbus)))
 
-(defn- read-storm-conf [conf storm-id]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-    (merge conf
-       (clojurify-structure
-         (Utils/fromCompressedJsonConf
-           (FileUtils/readFileToByteArray
-             (File. (master-stormconf-path stormroot))))))))
+(defn- get-subject []
+  (let [req (ReqContext/context)]
+    (.subject req)))
+
+(def user-subject
+  (get-subject))
+
+(defn- read-storm-conf [conf storm-id blob-store]
+  (clojurify-structure
+    (Utils/fromCompressedJsonConf
+      (.readBlob blob-store (master-stormconf-key storm-id) user-subject))))
 
 (declare delay-event)
 (declare mk-assignments)
 
+(defn get-nimbus-subject
+  []
+  (let [subject (Subject.)
+        principal (NimbusPrincipal.)
+        principals (.getPrincipals subject)]
+    (.add principals principal)
+    subject))
+
+(def nimbus-subject
+  (get-nimbus-subject))
+
+(defn- get-key-list-from-id
+  [conf id]
+  (log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)})
+  (if (local-mode? conf)
+    [(master-stormcode-key id) (master-stormconf-key id)]
+    [(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)]))
+
 (defn kill-transition [nimbus storm-id]
   (fn [kill-time]
     (let [delay (if kill-time
                   kill-time
-                  (get (read-storm-conf (:conf nimbus) storm-id)
+                  (get (read-storm-conf (:conf nimbus) storm-id (:blob-store nimbus))
                        TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
       (delay-event nimbus
                    storm-id
@@ -205,7 +257,7 @@
   (fn [time num-workers executor-overrides]
     (let [delay (if time
                   time
-                  (get (read-storm-conf (:conf nimbus) storm-id)
+                  (get (read-storm-conf (:conf nimbus) storm-id (:blob-store nimbus))
                        TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
       (delay-event nimbus
                    storm-id
@@ -250,6 +302,10 @@
                       (log-message "Killing topology: " storm-id)
                       (.remove-storm! (:storm-cluster-state nimbus)
                                       storm-id)
+                      (when (instance? LocalFsBlobStore (:blob-store nimbus))
+                        (doseq [blob-key (get-key-list-from-id (:conf nimbus) storm-id)]
+                          (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+                          (.remove-key-version! (:storm-cluster-state nimbus) blob-key)))
                       nil)
             }
    :rebalancing {:startup (fn [] (delay-event nimbus
@@ -391,53 +447,99 @@
       [(.getNodeId slot) (.getPort slot)]
       )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeySequenceNumber. key nimbus-host-port-info)]
+    (.getKeySequenceNumber version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store)]
+    (iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
-   ))
+  (let [subject user-subject
+        storm-cluster-state (:storm-cluster-state nimbus)
+        blob-store (:blob-store nimbus)
+        jar-key (master-stormjar-key storm-id)
+        code-key (master-stormcode-key storm-id)
+        conf-key (master-stormconf-key storm-id)
+        nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+    (when tmp-jar-location ;;in local mode there is no jar
+      (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+      (if (instance? LocalFsBlobStore blob-store)
+        (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf))))
+    (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+    (if (instance? LocalFsBlobStore blob-store)
+      (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf)))
+    (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+    (if (instance? LocalFsBlobStore blob-store)
+      (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf)))))
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+    (.readBlob blob-store (master-stormcode-key storm-id) user-subject) StormTopology))
+
+(defn get-blob-replication-count
+  [blob-key nimbus]
+  (if (:blob-store nimbus)
+        (-> (:blob-store nimbus)
+          (.getBlobReplication  blob-key nimbus-subject))))
 
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
         max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
-        total-wait-time (atom 0)
-        current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
-  (if (:code-distributor nimbus)
-    (while (and (> min-replication-count @current-replication-count)
-             (or (= -1 max-replication-wait-time)
-               (< @total-wait-time max-replication-wait-time)))
+        current-replication-count-jar (if (not (local-mode? conf))
+                                        (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus))
+                                        (atom min-replication-count))
+        current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+        current-replication-count-conf (atom (get-blob-replication-count (master-stormconf-key storm-id) nimbus))
+        total-wait-time (atom 0)]
+    (if (:blob-store nimbus)
+      (while (and
+               (or (> min-replication-count @current-replication-count-jar)
+                   (> min-replication-count @current-replication-count-code)
+                   (> min-replication-count @current-replication-count-conf))
+               (or (neg? max-replication-wait-time)
+                   (< @total-wait-time max-replication-wait-time)))
         (sleep-secs 1)
         (log-debug "waiting for desired replication to be achieved.
           min-replication-count = " min-replication-count  " max-replication-wait-time = " max-replication-wait-time
-          "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time)
+          (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar)
+          "current-replication-count for code key = " @current-replication-count-code
+          "current-replication-count for conf key = " @current-replication-count-conf
+          " total-wait-time " @total-wait-time)
         (swap! total-wait-time inc)
-        (reset! current-replication-count  (.getReplicationCount (:code-distributor nimbus) storm-id))))
-  (if (< min-replication-count @current-replication-count)
-    (log-message "desired replication count "  min-replication-count " achieved,
-      current-replication-count" @current-replication-count)
-    (log-message "desired replication count of "  min-replication-count " not achieved but we have hit the max wait time "
-      max-replication-wait-time " so moving on with replication count = " @current-replication-count)
-    )))
-
-(defn- read-storm-topology [conf storm-id]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-    (Utils/deserialize
-      (FileUtils/readFileToByteArray
-        (File. (master-stormcode-path stormroot))
-        ) StormTopology)))
+        (if (not (local-mode? conf))
+          (reset! current-replication-count-conf  (get-blob-replication-count (master-stormconf-key storm-id) nimbus)))
+        (reset! current-replication-count-code  (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+        (reset! current-replication-count-jar  (get-blob-replication-count (master-stormjar-key storm-id) nimbus))))
+    (if (and (< min-replication-count @current-replication-count-conf)
+             (< min-replication-count @current-replication-count-code)
+             (< min-replication-count @current-replication-count-jar))
+      (log-message "desired replication count of "  min-replication-count " not achieved but we have hit the max wait time "
+        max-replication-wait-time " so moving on with replication count for conf key = " @current-replication-count-conf
+        " for code key = " @current-replication-count-code "for jar key = " @current-replication-count-jar)
+      (log-message "desired replication count "  min-replication-count " achieved, "
+        "current-replication-count for conf key = " @current-replication-count-conf ", "
+        "current-replication-count for code key = " @current-replication-count-code ", "
+        "current-replication-count for jar key = " @current-replication-count-jar))))
+
+(defn- read-storm-topology-as-nimbus [storm-id blob-store]
+  (Utils/deserialize
+    (.readBlob blob-store (master-stormcode-key storm-id) nimbus-subject) StormTopology))
 
 (declare compute-executor->component)
 
+(defn read-storm-conf-as-nimbus [storm-id blob-store]
+  (clojurify-structure
+    (Utils/fromCompressedJsonConf
+      (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
+
 (defn read-topology-details [nimbus storm-id]
   (let [conf (:conf nimbus)
+        blob-store (:blob-store nimbus)
         storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
-        topology-conf (read-storm-conf conf storm-id)
-        topology (read-storm-topology conf storm-id)
+        topology-conf (read-storm-conf-as-nimbus storm-id blob-store)
+        topology (read-storm-topology-as-nimbus storm-id blob-store)
         executor->component (->> (compute-executor->component nimbus storm-id)
                                  (map-key (fn [[start-task end-task]]
                                             (ExecutorDetails. (int start-task) (int end-task)))))]
@@ -530,10 +632,11 @@
 
 (defn- compute-executors [nimbus storm-id]
   (let [conf (:conf nimbus)
+        blob-store (:blob-store nimbus)
         storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
         component->executors (:component->executors storm-base)
-        storm-conf (read-storm-conf conf storm-id)
-        topology (read-storm-topology conf storm-id)
+        storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
+        topology (read-storm-topology-as-nimbus storm-id blob-store)
         task->component (storm-task-info topology storm-conf)]
     (->> (storm-task-info topology storm-conf)
          reverse-map
@@ -546,9 +649,10 @@
 
 (defn- compute-executor->component [nimbus storm-id]
   (let [conf (:conf nimbus)
+        blob-store (:blob-store nimbus)
         executors (compute-executors nimbus storm-id)
-        topology (read-storm-topology conf storm-id)
-        storm-conf (read-storm-conf conf storm-id)
+        topology (read-storm-topology-as-nimbus storm-id blob-store)
+        storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
         task->component (storm-task-info topology storm-conf)
         executor->component (into {} (for [executor executors
                                            :let [start-task (first executor)
@@ -838,7 +942,7 @@
                                                                         )))
                                               worker->resources (get new-assigned-worker->resources topology-id)]]
                                    {topology-id (Assignment.
-                                                 (master-stormdist-root conf topology-id)
+                                                 (conf STORM-LOCAL-DIR)
                                                  (select-keys all-node->host all-nodes)
                                                  executor->node+port
                                                  start-times
@@ -875,8 +979,9 @@
   {:pre [(#{:active :inactive} topology-initial-status)]}
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         conf (:conf nimbus)
-        storm-conf (read-storm-conf conf storm-id)
-        topology (system-topology! storm-conf (read-storm-topology conf storm-id))
+        blob-store (:blob-store nimbus)
+        storm-conf (read-storm-conf conf storm-id blob-store)
+        topology (system-topology! storm-conf (read-storm-topology storm-id blob-store))
         num-executors (->> (all-components topology) (map-val num-start-executors))]
     (log-message "Activating " storm-name ": " storm-id)
     (.activate-storm! storm-cluster-state
@@ -935,17 +1040,15 @@
   ([nimbus storm-name storm-conf operation]
      (check-authorization! nimbus storm-name storm-conf operation (ReqContext/context))))
 
-(defn code-ids [conf]
-  (-> conf
-      master-stormdist-root
-      read-dir-contents
-      set
-      ))
+(defn code-ids [blob-store]
+  (let [to-id (reify KeyFilter
+                (filter [this key] (get-id-from-blob-key key)))]
+    (set (.filterAndListKeys blob-store to-id))))
 
-(defn cleanup-storm-ids [conf storm-cluster-state]
+(defn cleanup-storm-ids [conf storm-cluster-state blob-store]
   (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
         error-ids (set (.error-topologies storm-cluster-state))
-        code-ids (code-ids conf)
+        code-ids (code-ids blob-store)
         assigned-ids (set (.active-storms storm-cluster-state))]
     (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
     ))
@@ -1006,22 +1109,35 @@
             TOPOLOGY-EVENTLOGGER-EXECUTORS (total-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)
             TOPOLOGY-MAX-TASK-PARALLELISM (total-conf TOPOLOGY-MAX-TASK-PARALLELISM)})))
 
+(defn blob-rm-key [blob-store key storm-cluster-state]
+  (try
+    (.deleteBlob blob-store key nimbus-subject)
+    (if (instance? LocalFsBlobStore blob-store)
+      (.remove-blobstore-key! storm-cluster-state key))
+    (catch Exception e
+      (log-message "Exception" e))))
+
+(defn blob-rm-topology-keys [id blob-store storm-cluster-state]
+  (blob-rm-key blob-store (master-stormjar-key id) storm-cluster-state)
+  (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
+  (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
+
 (defn do-cleanup [nimbus]
   (if (is-leader nimbus :throw-exception false)
     (let [storm-cluster-state (:storm-cluster-state nimbus)
           conf (:conf nimbus)
-          submit-lock (:submit-lock nimbus)]
+          submit-lock (:submit-lock nimbus)
+          blob-store (:blob-store nimbus)]
       (let [to-cleanup-ids (locking submit-lock
-                             (cleanup-storm-ids conf storm-cluster-state))]
+                             (cleanup-storm-ids conf storm-cluster-state blob-store))]
         (when-not (empty? to-cleanup-ids)
           (doseq [id to-cleanup-ids]
             (log-message "Cleaning up " id)
-            (if (:code-distributor nimbus) (.cleanup (:code-distributor nimbus) id))
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
             (rmr (master-stormdist-root conf id))
-            (swap! (:heartbeats-cache nimbus) dissoc id))
-          )))
+            (blob-rm-topology-keys id blob-store storm-cluster-state)
+            (swap! (:heartbeats-cache nimbus) dissoc id)))))
     (log-message "not a leader, skipping cleanup")))
 
 (defn- file-older-than? [now seconds file]
@@ -1036,8 +1152,7 @@
       (if (.delete f)
         (log-message "Cleaning inbox ... deleted: " (.getName f))
         ;; This should never happen
-        (log-error "Cleaning inbox ... error deleting: " (.getName f))
-        ))))
+        (log-error "Cleaning inbox ... error deleting: " (.getName f))))))
 
 (defn clean-topology-history
   "Deletes topologies from history older than minutes."
@@ -1051,25 +1166,34 @@
       (ls-topo-hist! topo-history-state new-history))))
 
 (defn cleanup-corrupt-topologies! [nimbus]
-  (if (is-leader nimbus :throw-exception false)
-    (let [storm-cluster-state (:storm-cluster-state nimbus)
-          code-ids (set (code-ids (:conf nimbus)))
-          active-topologies (set (.active-storms storm-cluster-state))
-          corrupt-topologies (set/difference active-topologies code-ids)]
-      (doseq [corrupt corrupt-topologies]
-        (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
-        (.remove-storm! storm-cluster-state corrupt)
-        )))
-  (log-message "not a leader, skipping cleanup-corrupt-topologies"))
-
-;;setsup code distributor entries for all current topologies for which code is available locally.
-(defn setup-code-distributor [nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
-        locally-available-storm-ids (set (code-ids (:conf nimbus)))
+        blob-store (:blob-store nimbus)
+        code-ids (set (code-ids blob-store))
         active-topologies (set (.active-storms storm-cluster-state))
-        locally-available-active-storm-ids (set/intersection locally-available-storm-ids active-topologies)]
-    (doseq [storm-id locally-available-active-storm-ids]
-      (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))))
+        corrupt-topologies (set/difference active-topologies code-ids)]
+    (doseq [corrupt corrupt-topologies]
+      (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
+      (.remove-storm! storm-cluster-state corrupt)
+      (if (instance? LocalFsBlobStore blob-store)
+        (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
+          (.remove-blobstore-key! storm-cluster-state blob-key))))))
+
+(defn setup-blobstore [nimbus]
+  "Sets up blobstore state for all current keys."
+  (let [storm-cluster-state (:storm-cluster-state nimbus)
+        blob-store (:blob-store nimbus)
+        local-set-of-keys (set (get-key-seq-from-blob-store blob-store))
+        all-keys (set (.active-keys storm-cluster-state))
+        locally-available-active-keys (set/intersection local-set-of-keys all-keys)
+        keys-to-delete (set/difference local-set-of-keys all-keys)
+        conf (:conf nimbus)
+        nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+    (log-debug "Deleting keys not on the zookeeper" keys-to-delete)
+    (doseq [key keys-to-delete]
+      (.deleteBlob blob-store key nimbus-subject))
+    (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys)
+    (doseq [key locally-available-active-keys]
+      (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf)))))
 
 (defn- get-errors [storm-cluster-state storm-id component-id]
   (->> (.errors storm-cluster-state storm-id component-id)
@@ -1102,26 +1226,26 @@
     (catch Exception e
       (throw (AuthorizationException. (str "Invalid file path: " file-path))))))
 
-(defn try-read-storm-conf [conf storm-id]
+(defn try-read-storm-conf
+  [conf storm-id blob-store]
   (try-cause
-    (read-storm-conf conf storm-id)
-    (catch FileNotFoundException e
-       (throw (NotAliveException. (str storm-id))))
-  )
-)
+    (read-storm-conf-as-nimbus storm-id blob-store)
+    (catch KeyNotFoundException e
+      (throw (NotAliveException. (str storm-id))))))
 
-(defn try-read-storm-conf-from-name [conf storm-name nimbus]
+(defn try-read-storm-conf-from-name
+  [conf storm-name nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
+        blob-store (:blob-store nimbus)
         id (get-storm-id storm-cluster-state storm-name)]
-   (try-read-storm-conf conf id)))
+    (try-read-storm-conf conf id blob-store)))
 
-(defn try-read-storm-topology [conf storm-id]
+(defn try-read-storm-topology
+  [storm-id blob-store]
   (try-cause
-    (read-storm-topology conf storm-id)
-    (catch FileNotFoundException e
-       (throw (NotAliveException. (str storm-id))))
-  )
-)
+    (read-storm-topology-as-nimbus storm-id blob-store)
+    (catch KeyNotFoundException e
+      (throw (NotAliveException. (str storm-id))))))
 
 (defn add-topology-to-history-log
   [storm-id nimbus topology-conf]
@@ -1166,6 +1290,7 @@
 (defn renew-credentials [nimbus]
   (if (is-leader nimbus :throw-exception false)
     (let [storm-cluster-state (:storm-cluster-state nimbus)
+          blob-store (:blob-store nimbus)
           renewers (:cred-renewers nimbus)
           update-lock (:cred-update-lock nimbus)
           assigned-ids (set (.active-storms storm-cluster-state))]
@@ -1173,7 +1298,7 @@
         (doseq [id assigned-ids]
           (locking update-lock
             (let [orig-creds (.credentials storm-cluster-state id nil)
-                  topology-conf (try-read-storm-conf (:conf nimbus) id)]
+                  topology-conf (try-read-storm-conf (:conf nimbus) id blob-store)]
               (if orig-creds
                 (let [new-creds (HashMap. orig-creds)]
                   (doseq [renewer renewers]
@@ -1210,22 +1335,40 @@
      (.set_reset_log_level_timeout_epoch log-config (coerce/to-long timeout))
      (.unset_reset_log_level_timeout_epoch log-config))))
 
+(defmethod blob-sync :distributed [conf nimbus]
+  (if (not (is-leader nimbus :throw-exception false))
+    (let [storm-cluster-state (:storm-cluster-state nimbus)
+          nimbus-host-port-info (:nimbus-host-port-info nimbus)
+          blob-store-key-set (set (get-key-seq-from-blob-store (:blob-store nimbus)))
+          zk-key-set (set (.blobstore storm-cluster-state (fn [] (blob-sync conf nimbus))))]
+      (log-debug "blob-sync " "blob-store-keys " blob-store-key-set "zookeeper-keys " zk-key-set)
+      (let [sync-blobs (doto
+                          (BlobSynchronizer. (:blob-store nimbus) conf)
+                          (.setNimbusInfo nimbus-host-port-info)
+                          (.setBlobStoreKeySet blob-store-key-set)
+                          (.setZookeeperKeySet zk-key-set))]
+        (.syncBlobs sync-blobs)))))
+
+(defmethod blob-sync :local [conf nimbus]
+  nil)
+
 (defserverfn service-handler [conf inimbus]
   (.prepare inimbus conf (master-inimbus-dir conf))
   (log-message "Starting Nimbus with conf " conf)
   (let [nimbus (nimbus-data conf inimbus)
+        blob-store (:blob-store nimbus)
         principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
         admin-users (or (.get conf NIMBUS-ADMINS) [])
         get-common-topo-info
           (fn [^String storm-id operation]
             (let [storm-cluster-state (:storm-cluster-state nimbus)
-                  topology-conf (try-read-storm-conf conf storm-id)
+                  topology-conf (try-read-storm-conf conf storm-id blob-store)
                   storm-name (topology-conf TOPOLOGY-NAME)
                   _ (check-authorization! nimbus
                                           storm-name
                                           topology-conf
                                           operation)
-                  topology (try-read-storm-topology conf storm-id)
+                  topology (try-read-storm-topology storm-id blob-store)
                   task->component (storm-task-info topology topology-conf)
                   base (.storm-base storm-cluster-state storm-id nil)
                   launch-time-secs (if base (:launch-time-secs base)
@@ -1264,10 +1407,11 @@
 
     (.addToLeaderLockQueue (:leader-elector nimbus))
     (cleanup-corrupt-topologies! nimbus)
-    (setup-code-distributor nimbus)
+    (when (instance? LocalFsBlobStore blob-store)
+      ;register call back for blob-store
+      (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
+      (setup-blobstore nimbus))
 
-    ;register call back for code-distributor
-    (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
     (when (is-leader nimbus :throw-exception false)
       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
         (transition! nimbus storm-id :startup)))
@@ -1278,31 +1422,27 @@
                           (when-not (conf NIMBUS-DO-NOT-REASSIGN)
                             (locking (:submit-lock nimbus)
                               (mk-assignments nimbus)))
-                          (do-cleanup nimbus)
-                          ))
+                          (do-cleanup nimbus)))
     ;; Schedule Nimbus inbox cleaner
     (schedule-recurring (:timer nimbus)
                         0
                         (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
                         (fn []
-                          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
-                          ))
+                          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+    ;; Schedule nimbus code sync thread to sync code from other nimbuses.
+    (if (instance? LocalFsBlobStore blob-store)
+      (schedule-recurring (:timer nimbus)
+                          0
+                          (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+                          (fn []
+                            (blob-sync conf nimbus))))
     ;; Schedule topology history cleaner
     (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
       (schedule-recurring (:timer nimbus)
         0
         (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
         (fn []
-          (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)
-          )))
-    ;;schedule nimbus code sync thread to sync code from other nimbuses.
-    (schedule-recurring (:timer nimbus)
-      0
-      (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-      (fn []
-        (sync-code conf nimbus)
-        ))
-
+          (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
     (schedule-recurring (:timer nimbus)
                         0
                         (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
@@ -1349,10 +1489,11 @@
                 principal (.principal req)
                 submitter-principal (if principal (.toString principal))
                 submitter-user (.toLocal principal-to-local principal)
+                system-user (System/getProperty "user.name")
                 topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
                 storm-conf (-> storm-conf-submitted
                                (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal ""))
-                               (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user "")) ;Don't let the user set who we launch as
+                               (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as
                                (assoc TOPOLOGY-USERS topo-acl)
                                (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL)))
                 storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf)
@@ -1380,8 +1521,8 @@
               (check-storm-active! nimbus storm-name false)
               ;;cred-update-lock is not needed here because creds are being added for the first time.
               (.set-credentials! storm-cluster-state storm-id credentials storm-conf)
-              (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology)
-              (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))
+              (log-message "uploadedJar " uploadedJarLocation)
+              (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
               (wait-for-desired-code-replication nimbus total-storm-conf storm-id)
               (.setup-heartbeats! storm-cluster-state storm-id)
               (.setup-backpressure! storm-cluster-state storm-id)
@@ -1456,7 +1597,7 @@
         (mark! nimbus:num-debug-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               storm-id (get-storm-id storm-cluster-state storm-name)
-              topology-conf (try-read-storm-conf conf storm-id)
+              topology-conf (try-read-storm-conf conf storm-id blob-store)
               ;; make sure samplingPct is within bounds.
               spct (Math/max (Math/min samplingPct 100.0) 0.0)
               ;; while disabling we retain the sampling pct.
@@ -1475,7 +1616,7 @@
       (^void setWorkerProfiler
         [this ^String id ^ProfileRequest profileRequest]
         (mark! nimbus:num-setWorkerProfiler-calls)
-        (let [topology-conf (try-read-storm-conf conf id)
+        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
               storm-cluster-state (:storm-cluster-state nimbus)]
@@ -1506,7 +1647,7 @@
 
       (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
         (mark! nimbus:num-setLogConfig-calls)
-        (let [topology-conf (try-read-storm-conf conf id)
+        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
               storm-cluster-state (:storm-cluster-state nimbus)
@@ -1534,7 +1675,7 @@
         (mark! nimbus:num-uploadNewCredentials-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               storm-id (get-storm-id storm-cluster-state storm-name)
-              topology-conf (try-read-storm-conf conf storm-id)
+              topology-conf (try-read-storm-conf conf storm-id blob-store)
               creds (when credentials (.get_creds credentials))]
           (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
           (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf))))
@@ -1607,7 +1748,7 @@
 
       (^LogConfig getLogConfig [this ^String id]
         (mark! nimbus:num-getLogConfig-calls)
-        (let [topology-conf (try-read-storm-conf conf id)
+        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
              storm-cluster-state (:storm-cluster-state nimbus)
@@ -1616,24 +1757,24 @@
 
       (^String getTopologyConf [this ^String id]
         (mark! nimbus:num-getTopologyConf-calls)
-        (let [topology-conf (try-read-storm-conf conf id)
+        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
               (to-json topology-conf)))
 
       (^StormTopology getTopology [this ^String id]
         (mark! nimbus:num-getTopology-calls)
-        (let [topology-conf (try-read-storm-conf conf id)
+        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopology")
               (system-topology! topology-conf (try-read-storm-topology conf id))))
 
       (^StormTopology getUserTopology [this ^String id]
         (mark! nimbus:num-getUserTopology-calls)
-        (let [topology-conf (try-read-storm-conf conf id)
+        (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getUserTopology")
-              (try-read-storm-topology topology-conf id)))
+              (try-read-storm-topology id blob-store)))
 
       (^ClusterSummary getClusterInfo [this]
         (mark! nimbus:num-getClusterInfo-calls)
@@ -1668,42 +1809,39 @@
                     (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
 
               topology-summaries (dofor [[id base] bases :when base]
-	                                  (let [assignment (.assignment-info storm-cluster-state id nil)
-                                                topo-summ (TopologySummary. id
-                                                            (:storm-name base)
-                                                            (->> (:executor->node+port assignment)
-                                                                 keys
-                                                                 (mapcat executor-id->tasks)
-                                                                 count) 
-                                                            (->> (:executor->node+port assignment)
-                                                                 keys
-                                                                 count)                                                            
-                                                            (->> (:executor->node+port assignment)
-                                                                 vals
-                                                                 set
-                                                                 count)
-                                                            (time-delta (:launch-time-secs base))
-                                                            (extract-status-str base))]
-                                               (when-let [owner (:owner base)] (.set_owner topo-summ owner))
-                                               (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
-                                               (when-let [resources (.get @(:id->resources nimbus) id)]
-                                                 (.set_requested_memonheap topo-summ (get resources 0))
-                                                 (.set_requested_memoffheap topo-summ (get resources 1))
-                                                 (.set_requested_cpu topo-summ (get resources 2))
-                                                 (.set_assigned_memonheap topo-summ (get resources 3))
-                                                 (.set_assigned_memoffheap topo-summ (get resources 4))
-                                                 (.set_assigned_cpu topo-summ (get resources 5)))
-                                               (.set_replication_count topo-summ (if (:code-distributor nimbus)
-                                                                                   (.getReplicationCount (:code-distributor nimbus) id)
-                                                                                   1))
-                                               topo-summ
-                                          ))
+                                   (let [assignment (.assignment-info storm-cluster-state id nil)
+                                         topo-summ (TopologySummary. id
+                                                     (:storm-name base)
+                                                     (->> (:executor->node+port assignment)
+                                                       keys
+                                                       (mapcat executor-id->tasks)
+                                                       count)
+                                                     (->> (:executor->node+port assignment)
+                                                       keys
+                                                       count)
+                                                     (->> (:executor->node+port assignment)
+                                                       vals
+                                                       set
+                                                       count)
+                                                     (time-delta (:launch-time-secs base))
+                                                     (extract-status-str base))]
+                                     (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+                                     (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+                                     (when-let [resources (.get @(:id->resources nimbus) id)]
+                                       (.set_requested_memonheap topo-summ (get resources 0))
+                                       (.set_requested_memoffheap topo-summ (get resources 1))
+                                       (.set_requested_cpu topo-summ (get resources 2))
+                                       (.set_assigned_memonheap topo-summ (get resources 3))
+                                       (.set_assigned_memoffheap topo-summ (get resources 4))
+                                       (.set_assigned_cpu topo-summ (get resources 5)))
+                                     (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+                                     topo-summ))
               ret (ClusterSummary. supervisor-summaries
                                    topology-summaries
                                    nimbuses)
               _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
               ret))
-      
+
       (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
         (mark! nimbus:num-getTopologyInfoWithOpts-calls)
         (let [{:keys [storm-name
@@ -1763,10 +1901,8 @@
               (.set_assigned_cpu topo-info (get resources 5)))
             (when-let [component->debug (:component->debug base)]
               (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
-            (.set_replication_count topo-info (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 1))
-
-            topo-info
-          ))
+            (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+          topo-info))
 
       (^TopologyInfo getTopologyInfo [this ^String topology-id]
         (mark! nimbus:num-getTopologyInfo-calls)
@@ -1774,6 +1910,157 @@
                                   topology-id
                                   (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
 
+      (^String beginCreateBlob [this
+                                ^String blob-key
+                                ^SettableBlobMeta blob-meta]
+        (let [session-id (uuid)]
+          (.put (:blob-uploaders nimbus)
+            session-id
+            (.createBlob (:blob-store nimbus) blob-key blob-meta user-subject))
+          (log-message "Created blob for " blob-key
+            " with session id " session-id)
+          (str session-id)))
+
+      (^String beginUpdateBlob [this ^String blob-key]
+        (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus)
+                                       blob-key user-subject)]
+          (let [session-id (uuid)]
+            (.put (:blob-uploaders nimbus) session-id os)
+            (log-message "Created upload session for " blob-key
+              " with id " session-id)
+            (str session-id))))
+
+      (^void createStateInZookeeper [this ^String blob-key]
+        (let [storm-cluster-state (:storm-cluster-state nimbus)
+              blob-store (:blob-store nimbus)
+              nimbus-host-port-info (:nimbus-host-port-info nimbus)
+              conf (:conf nimbus)]
+          (if (instance? LocalFsBlobStore blob-store)
+              (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
+          (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
+
+      (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
+        (let [uploaders (:blob-uploaders nimbus)]
+          (if-let [^AtomicOutputStream os (.get uploaders session)]
+            (let [chunk-array (.array blob-chunk)
+                  remaining (.remaining blob-chunk)
+                  array-offset (.arrayOffset blob-chunk)
+                  position (.position blob-chunk)]
+              (.write os chunk-array (+ array-offset position) remaining)
+              (.put uploaders session os))
+            (throw-runtime "Blob for session "
+              session
+              " does not exist (or timed out)"))))
+
+      (^void finishBlobUpload [this ^String session]
+        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+          (do
+            (.close os)
+            (log-message "Finished uploading blob for session "
+              session
+              ". Closing session.")
+            (.remove (:blob-uploaders nimbus) session))
+          (throw-runtime "Blob for session "
+            session
+            " does not exist (or timed out)")))
+
+      (^void cancelBlobUpload [this ^String session]
+        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+          (do
+            (.cancel os)
+            (log-message "Canceled uploading blob for session "
+              session
+              ". Closing session.")
+            (.remove (:blob-uploaders nimbus) session))
+          (throw-runtime "Blob for session "
+            session
+            " does not exist (or timed out)")))
+
+      (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
+        (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus)
+                                      blob-key user-subject)]
+          ret))
+
+      (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta]
+        (->> (ReqContext/context)
+          (.subject)
+          (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
+
+      (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
+        (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus)
+                                        blob-key user-subject)]
+          (let [session-id (uuid)
+                ret (BeginDownloadResult. (.getVersion is) (str session-id))]
+            (.set_data_size ret (.getFileLength is))
+            (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
+            (log-message "Created download session for " blob-key
+              " with id " session-id)
+            ret)))
+
+      (^ByteBuffer downloadBlobChunk [this ^String session]
+        (let [downloaders (:blob-downloaders nimbus)
+              ^BufferInputStream is (.get downloaders session)]
+          (when-not is
+            (throw (RuntimeException.
+                     "Could not find input stream for session " session)))
+          (let [ret (.read is)]
+            (.put downloaders session is)
+            (when (empty? ret)
+              (.close is)
+              (.remove downloaders session))
+            (log-debug "Sending " (alength ret) " bytes")
+            (ByteBuffer/wrap ret))))
+
+      (^void deleteBlob [this ^String blob-key]
+        (let [subject (->> (ReqContext/context)
+                           (.subject))]
+          (.deleteBlob (:blob-store nimbus) blob-key subject)
+          (when (instance? LocalFsBlobStore blob-store)
+            (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+            (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
+          (log-message "Deleted blob for key " blob-key)))
+
+      (^ListBlobsResult listBlobs [this ^String session]
+        (let [listers (:blob-listers nimbus)
+              ^Iterator keys-it (if (clojure.string/blank? session)
+                                  (.listKeys (:blob-store nimbus))
+                                  (.get listers session))
+              _ (or keys-it (throw-runtime "Blob list for session "
+                              session
+                              " does not exist (or timed out)"))
+
+              ;; Create a new session id if the user gave an empty session string.
+              ;; This is the use case when the user wishes to list blobs
+              ;; starting from the beginning.
+              session (if (clojure.string/blank? session)
+                        (let [new-session (uuid)]
+                          (log-message "Creating new session for downloading list " new-session)
+                          new-session)
+                        session)]
+          (if-not (.hasNext keys-it)
+            (do
+              (.remove listers session)
+              (log-message "No more blobs to list for session " session)
+              ;; A blank result communicates that there are no more blobs.
+              (ListBlobsResult. (ArrayList. 0) (str session)))
+            (let [^List list-chunk (->> keys-it
+                                     (iterator-seq)
+                                     (take 100) ;; Limit to next 100 keys
+                                     (ArrayList.))]
+              (log-message session " downloading " (.size list-chunk) " entries")
+              (.put listers session keys-it)
+              (ListBlobsResult. list-chunk (str session))))))
+
+      (^int getBlobReplication [this ^String blob-key]
+        (->> (ReqContext/context)
+          (.subject)
+          (.getBlobReplication (:blob-store nimbus) blob-key)))
+
+      (^int updateBlobReplication [this ^String blob-key ^int replication]
+        (->> (ReqContext/context)
+          (.subject)
+          (.updateBlobReplication (:blob-store nimbus) blob-key replication)))
+
       (^TopologyPageInfo getTopologyPageInfo
         [this ^String topo-id ^String window ^boolean include-sys?]
         (mark! nimbus:num-getTopologyPageInfo-calls)
@@ -1807,9 +2094,8 @@
             (.set_status (extract-status-str (:base info)))
             (.set_uptime_secs (time-delta (:launch-time-secs info)))
             (.set_topology_conf (to-json (try-read-storm-conf conf
-                                                              topo-id)))
-            (.set_replication_count
-              (.getReplicationCount (:code-distributor nimbus) topo-id)))
+                                                              topo-id (:blob-store nimbus))))
+            (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
           (when-let [debug-options
                      (get-in info [:base :component->debug topo-id])]
             (.set_debug_options
@@ -1889,60 +2175,14 @@
         (.disconnect (:storm-cluster-state nimbus))
         (.cleanup (:downloaders nimbus))
         (.cleanup (:uploaders nimbus))
+        (.shutdown (:blob-store nimbus))
         (.close (:leader-elector nimbus))
-        (if (:code-distributor nimbus) (.close (:code-distributor nimbus) (:conf nimbus)))
         (when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus)))
-        (log-message "Shut down master")
-        )
+        (log-message "Shut down master"))
       DaemonCommon
       (waiting? [this]
         (timer-waiting? (:timer nimbus))))))
 
-(defmethod mk-code-distributor :distributed [conf]
-  (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
-    (.prepare code-distributor conf)
-    code-distributor))
-
-(defmethod mk-code-distributor :local [conf]
-  nil)
-
-(defn download-code [conf nimbus storm-id host port]
-  (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid))
-        storm-cluster-state (:storm-cluster-state nimbus)
-        storm-root (master-stormdist-root conf storm-id)
-        remote-meta-file-path (master-storm-metafile-path storm-root)
-        local-meta-file-path (master-storm-metafile-path tmp-root)]
-    (FileUtils/forceMkdir (File. tmp-root))
-    (Utils/downloadFromHost conf remote-meta-file-path local-meta-file-path host port)
-    (if (:code-distributor nimbus)
-      (.download (:code-distributor nimbus) storm-id (File. local-meta-file-path)))
-    (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root)))
-    (FileUtils/moveDirectory (File. tmp-root) (File. storm-root))
-    (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))))
-
-(defmethod sync-code :distributed [conf nimbus]
-  (let [storm-cluster-state (:storm-cluster-state nimbus)
-        active-topologies (set (.code-distributor storm-cluster-state (fn [] (sync-code conf nimbus))))
-        missing-topologies (set/difference active-topologies (set (code-ids (:conf nimbus))))]
-    (if (not (empty? missing-topologies))
-      (do
-        (.removeFromLeaderLockQueue (:leader-elector nimbus))
-        (doseq [missing missing-topologies]
-          (log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.")
-          (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)]
-            (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing))
-            (doseq [nimbus-host-port nimbuses-with-missing]
-              (when-not (contains? (code-ids (:conf nimbus)) missing)
-                (try
-                  (download-code conf nimbus missing (.getHost nimbus-host-port) (.getPort nimbus-host-port))
-                  (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing)))))))))
-
-    (if (empty? (set/difference active-topologies (set (code-ids (:conf nimbus)))))
-      (.addToLeaderLockQueue (:leader-elector nimbus)))))
-
-(defmethod sync-code :local [conf nimbus]
-  nil)
-
 (defn launch-server! [conf nimbus]
   (validate-distributed-mode! conf)
   (let [service-handler (service-handler conf nimbus)