You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:04:06 UTC
[11/17] storm git commit: Blobstore API STORM- 876
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0da88b0..f093ce5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,6 +175,7 @@
<java_jmx.version>0.3.1</java_jmx.version>
<compojure.version>1.1.3</compojure.version>
<hiccup.version>0.3.6</hiccup.version>
+ <commons-compress.version>1.4.1</commons-compress.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang.version>2.5</commons-lang.version>
<commons-exec.version>1.1</commons-exec.version>
@@ -355,6 +356,11 @@
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons-compress.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>${commons-exec.version}</version>
</dependency>
@@ -469,7 +475,12 @@
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
@@ -659,6 +670,7 @@
<version>${thrift.version}</version>
<scope>compile</scope>
</dependency>
+ <!-- used by examples/storm-starter -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -818,6 +830,8 @@
<exclude>**/.idea/**</exclude>
<!-- module specific testing artifacts -->
<exclude>**/metastore_db/**</exclude>
+ <!-- anything written into build should be ignored -->
+ <exclude>**/build/**</exclude>
<!-- exclude CHANGELOG, VERSION, AND TODO files -->
<exclude>**/CHANGELOG.md</exclude>
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 083cdca..72c4a3a 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -36,6 +36,17 @@
</properties>
<dependencies>
+ <!--Hadoop Mini Cluster cannot use log4j2 bridge,
+ Surefire has a way to exclude the conflicting log4j API jar
+ from the classpath, classpathDependencyExcludes, but it didn't work in practice.
+ This is here as a work around to place it at the beginning of the classpath
+ even though maven does not officially support ordering of the classpath.-->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
@@ -140,6 +151,10 @@
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<scope>compile</scope>
</dependency>
@@ -178,7 +193,6 @@
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
@@ -193,7 +207,20 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
@@ -433,6 +460,7 @@
<include>org.yaml:snakeyaml</include>
<include>org.jgrapht:jgrapht-core</include>
<include>org.apache.commons:commons-exec</include>
+ <include>org.apache.commons:commons-compress</include>
<include>commons-io:commons-io</include>
<include>commons-codec:commons-codec</include>
<include>commons-fileupload:commons-fileupload</include>
@@ -574,6 +602,10 @@
<shadedPattern>org.apache.storm.shade.org.apache.commons.io</shadedPattern>
</relocation>
<relocation>
+ <pattern>org.apache.commons.compress</pattern>
+ <shadedPattern>org.apache.storm.shade.org.apache.commons.compress</shadedPattern>
+ </relocation>
+ <relocation>
<pattern>org.apache.commons.codec</pattern>
<shadedPattern>org.apache.storm.shade.org.apache.commons.codec</shadedPattern>
</relocation>
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/blobstore.clj b/storm-core/src/clj/backtype/storm/blobstore.clj
new file mode 100644
index 0000000..936f4b5
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/blobstore.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns backtype.storm.blobstore
+ (:import [backtype.storm.utils Utils])
+ (:import [backtype.storm.blobstore ClientBlobStore])
+ (:use [backtype.storm config]))
+
+(defmacro with-configured-blob-client
+ [client-sym & body]
+ `(let [conf# (read-storm-config)
+ ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)]
+ (try
+ ~@body
+ (finally (.shutdown ~client-sym)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 35aa8c8..ebe4955 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -45,18 +45,14 @@
(log-debug "Creating cluster state: " (.toString clazz))
(or (.mkState state-instance conf auth-conf acls context)
nil)))
-
(defprotocol StormClusterState
(assignments [this callback])
(assignment-info [this storm-id callback])
(assignment-info-with-version [this storm-id callback])
(assignment-version [this storm-id callback])
- ;returns topologyIds under /stormroot/code-distributor
- (code-distributor [this callback])
- ;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id
- (code-distributor-info [this storm-id])
-
+ ;returns key information under /storm/blobstore/key
+ (blobstore-info [this blob-key])
;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data>
(nimbuses [this])
;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
@@ -90,9 +86,14 @@
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
- ;adds nimbusinfo under /stormroot/code-distributor/storm-id
- (setup-code-distributor! [this storm-id info])
+ ;; sets up information related to key consisting of nimbus
+ ;; host:port and version info of the blob
+ (setup-blobstore! [this key nimbusInfo versionInfo])
+ (active-keys [this])
+ (blobstore [this callback])
(remove-storm! [this storm-id])
+ (remove-blobstore-key! [this blob-key])
+ (remove-key-version! [this blob-key])
(report-error [this storm-id component-id node port error])
(errors [this storm-id component-id])
(last-error [this storm-id component-id])
@@ -107,7 +108,9 @@
(def WORKERBEATS-ROOT "workerbeats")
(def BACKPRESSURE-ROOT "backpressure")
(def ERRORS-ROOT "errors")
-(def CODE-DISTRIBUTOR-ROOT "code-distributor")
+(def BLOBSTORE-ROOT "blobstore")
+; Stores the latest update sequence for a blob
+(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber")
(def NIMBUSES-ROOT "nimbuses")
(def CREDENTIALS-ROOT "credentials")
(def LOGCONFIG-ROOT "logconfigs")
@@ -119,7 +122,9 @@
(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
-(def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT))
+;; Blobstore subtree /storm/blobstore
+(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT))
+(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT))
(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
@@ -133,9 +138,13 @@
[id]
(str ASSIGNMENTS-SUBTREE "/" id))
-(defn code-distributor-path
- [id]
- (str CODE-DISTRIBUTOR-SUBTREE "/" id))
+(defn blobstore-path
+ [key]
+ (str BLOBSTORE-SUBTREE "/" key))
+
+(defn blobstore-max-key-sequence-number-path
+ [key]
+ (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key))
(defn nimbus-path
[id]
@@ -244,7 +253,7 @@
backpressure-callback (atom {}) ;; we want to reigister a topo directory getChildren callback for all workers of this dir
assignments-callback (atom nil)
storm-base-callback (atom {})
- code-distributor-callback (atom nil)
+ blobstore-callback (atom nil)
credentials-callback (atom {})
log-config-callback (atom {})
state-id (.register
@@ -259,14 +268,14 @@
(issue-map-callback! assignment-version-callback (first args))
(issue-map-callback! assignment-info-with-version-callback (first args))))
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
- CODE-DISTRIBUTOR-ROOT (issue-callback! code-distributor-callback)
+ BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args))
BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args))
;; this should never happen
(exit-process! 30 "Unknown callback for subtree " subtree args)))))]
- (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE
+ (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
LOGCONFIG-SUBTREE]]
(.mkdirs cluster-state p acls))
(reify
@@ -299,13 +308,13 @@
(swap! assignment-version-callback assoc storm-id callback))
(.get_version cluster-state (assignment-path storm-id) (not-nil? callback)))
- (code-distributor
+ ;; blobstore state
+ (blobstore
[this callback]
(when callback
- (reset! code-distributor-callback callback))
- (do
- (.sync_path cluster-state CODE-DISTRIBUTOR-SUBTREE)
- (.get_children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback))))
+ (reset! blobstore-callback callback))
+ (.sync_path cluster-state BLOBSTORE-SUBTREE)
+ (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback)))
(nimbuses
[this]
@@ -327,18 +336,29 @@
(.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
- (code-distributor-info
- [this storm-id]
- (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info))
- (let [path (code-distributor-path storm-id)]
- (do
- (.sync_path cluster-state path)
- (.get_children cluster-state path false)))))
+ (setup-blobstore!
+ [this key nimbusInfo versionInfo]
+ (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)]
+ (log-message "setup-path" path)
+ (.mkdirs cluster-state (blobstore-path key) acls)
+ ;we delete the node first to ensure the node gets created as part of this session only.
+ (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo))
+ (.set_ephemeral_node cluster-state path nil acls)))
+
+ (blobstore-info
+ [this blob-key]
+ (let [path (blobstore-path blob-key)]
+ (.sync_path cluster-state path)
+ (.get_children cluster-state path false)))
(active-storms
[this]
(.get_children cluster-state STORMS-SUBTREE false))
+ (active-keys
+ [this]
+ (.get_children cluster-state BLOBSTORE-SUBTREE false))
+
(heartbeat-storms
[this]
(.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false))
@@ -526,18 +546,18 @@
(let [thrift-assignment (thriftify-assignment info)]
(.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
- (setup-code-distributor!
- [this storm-id nimbusInfo]
- (let [path (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo))]
- (.mkdirs cluster-state (code-distributor-path storm-id) acls)
- ;we delete the node first to ensure the node gets created as part of this session only.
- (.delete_node cluster-state path)
- (.set_ephemeral_node cluster-state path nil acls)))
+ (remove-blobstore-key!
+ [this blob-key]
+ (log-debug "removing key" blob-key)
+ (.delete_node cluster-state (blobstore-path blob-key)))
+
+ (remove-key-version!
+ [this blob-key]
+ (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key)))
(remove-storm!
[this storm-id]
(.delete_node cluster-state (assignment-path storm-id))
- (.delete_node cluster-state (code-distributor-path storm-id))
(.delete_node cluster-state (credentials-path storm-id))
(.delete_node cluster-state (log-config-path storm-id))
(.delete_node cluster-state (profiler-config-path storm-id))
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
index ff942db..fa36240 100644
--- a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
+++ b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj
@@ -149,6 +149,10 @@
[this path]
(zk/sync-path zk-writer path))
+ (delete-node-blobstore
+ [this path nimbus-host-port-info]
+ (zk/delete-node-blobstore zk-writer path nimbus-host-port-info))
+
(close
[this]
(reset! active false)
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/blobstore.clj b/storm-core/src/clj/backtype/storm/command/blobstore.clj
new file mode 100644
index 0000000..ae7f919
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/command/blobstore.clj
@@ -0,0 +1,162 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.blobstore
+ (:import [java.io InputStream OutputStream]
+ [backtype.storm.generated SettableBlobMeta AccessControl AuthorizationException
+ KeyNotFoundException]
+ [backtype.storm.blobstore BlobStoreAclHandler])
+ (:use [backtype.storm config]
+ [clojure.string :only [split]]
+ [clojure.tools.cli :only [cli]]
+ [clojure.java.io :only [copy input-stream output-stream]]
+ [backtype.storm blobstore log util])
+ (:gen-class))
+
+(defn update-blob-from-stream
+ "Update a blob in the blob store from an InputStream"
+ [key ^InputStream in]
+ (with-configured-blob-client blobstore
+ (let [out (.updateBlob blobstore key)]
+ (try
+ (copy in out)
+ (.close out)
+ (catch Exception e
+ (log-message e)
+ (.cancel out)
+ (throw e))))))
+
+(defn create-blob-from-stream
+ "Create a blob in the blob store from an InputStream"
+ [key ^InputStream in ^SettableBlobMeta meta]
+ (with-configured-blob-client blobstore
+ (let [out (.createBlob blobstore key meta)]
+ (try
+ (copy in out)
+ (.close out)
+ (catch Exception e
+ (.cancel out)
+ (throw e))))))
+
+(defn read-blob
+ "Read a blob in the blob store and write to an OutputStream"
+ [key ^OutputStream out]
+ (with-configured-blob-client blobstore
+ (with-open [in (.getBlob blobstore key)]
+ (copy in out))))
+
+(defn as-access-control
+ "Convert a parameter to an AccessControl object"
+ [param]
+ (BlobStoreAclHandler/parseAccessControl (str param)))
+
+(defn as-acl
+ [param]
+ (map as-access-control (split param #",")))
+
+(defn access-control-str
+ [^AccessControl acl]
+ (BlobStoreAclHandler/accessControlToString acl))
+
+(defn read-cli [args]
+ (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+ (if file
+ (with-open [f (output-stream file)]
+ (read-blob key f))
+ (read-blob key System/out))))
+
+(defn update-cli [args]
+ (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+ (if file
+ (with-open [f (input-stream file)]
+ (update-blob-from-stream key f))
+ (update-blob-from-stream key System/in))
+ (log-message "Successfully updated " key)))
+
+(defn create-cli [args]
+ (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
+ ["-a" "--acl" :default [] :parse-fn as-acl]
+ ["-r" "--replication-factor" :default -1 :parse-fn parse-int])
+ meta (doto (SettableBlobMeta. acl)
+ (.set_replication_factor replication-factor))]
+ (validate-key-name! key)
+ (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
+ (if file
+ (with-open [f (input-stream file)]
+ (create-blob-from-stream key f meta))
+ (create-blob-from-stream key System/in meta))
+ (log-message "Successfully created " key)))
+
+(defn delete-cli [args]
+ (with-configured-blob-client blobstore
+ (doseq [key args]
+ (.deleteBlob blobstore key)
+ (log-message "deleted " key))))
+
+(defn list-cli [args]
+ (with-configured-blob-client blobstore
+ (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
+ (doseq [key keys]
+ (try
+ (let [meta (.getBlobMeta blobstore key)
+ version (.get_version meta)
+ acl (.get_acl (.get_settable meta))]
+ (log-message key " " version " " (pr-str (map access-control-str acl))))
+ (catch AuthorizationException ae
+ (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
+ (catch KeyNotFoundException knf
+ (if-not (empty? args) (log-error key " NOT FOUND"))))))))
+
+(defn set-acl-cli [args]
+ (let [[{set-acl :set} [key] _]
+ (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
+ (with-configured-blob-client blobstore
+ (let [meta (.getBlobMeta blobstore key)
+ acl (.get_acl (.get_settable meta))
+ new-acl (if set-acl set-acl acl)
+ new-meta (SettableBlobMeta. new-acl)]
+ (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl)))
+ (.setBlobMeta blobstore key new-meta)))))
+
+(defn rep-cli [args]
+ (let [sub-command (first args)
+ new-args (rest args)]
+ (with-configured-blob-client blobstore
+ (condp = sub-command
+ "--read" (let [key (first new-args)
+ blob-replication (.getBlobReplication blobstore key)]
+ (log-message "Current replication factor " blob-replication)
+ blob-replication)
+ "--update" (let [[{replication-factor :replication-factor} [key] _]
+ (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])]
+ (if (nil? replication-factor)
+ (throw (RuntimeException. (str "Please set the replication factor")))
+ (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
+ (log-message "Replication factor is set to " blob-replication)
+ blob-replication)))
+ :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command")))))))
+
+(defn -main [& args]
+ (let [command (first args)
+ new-args (rest args)]
+ (condp = command
+ "cat" (read-cli new-args)
+ "create" (create-cli new-args)
+ "update" (update-cli new-args)
+ "delete" (delete-cli new-args)
+ "list" (list-cli new-args)
+ "set-acl" (set-acl-cli new-args)
+ "replication" (rep-cli new-args)
+ :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 94b66c3..1617a3b 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -107,6 +107,18 @@
(FileUtils/forceMkdir (File. ret))
ret))
+(defn master-stormjar-key
+ [topology-id]
+ (str topology-id "-stormjar.jar"))
+
+(defn master-stormcode-key
+ [topology-id]
+ (str topology-id "-stormcode.ser"))
+
+(defn master-stormconf-key
+ [topology-id]
+ (str topology-id "-stormconf.ser"))
+
(defn master-stormdist-root
([conf]
(str (master-local-dir conf) file-path-separator "stormdist"))
@@ -119,6 +131,10 @@
(FileUtils/forceMkdir (File. ret))
ret ))
+(defn read-supervisor-storm-conf-given-path
+ [conf stormconf-path]
+ (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))))
+
(defn master-storm-metafile-path [stormroot ]
(str stormroot file-path-separator "storm-code-distributor.meta"))
@@ -197,7 +213,7 @@
(let [stormroot (supervisor-stormdist-root conf storm-id)
conf-path (supervisor-stormconf-path stormroot)
topology-path (supervisor-stormcode-path stormroot)]
- (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. conf-path)))))))
+ (read-supervisor-storm-conf-given-path conf conf-path)))
(defn read-supervisor-topology
[conf storm-id]
@@ -221,7 +237,11 @@
nil
)))
-
+(defn get-id-from-blob-key
+ [key]
+ (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)]
+ (nth groups 1)))
+
(defn set-worker-user! [conf worker-id user]
(log-message "SET worker-user " worker-id " " user)
(let [file (worker-user-file conf worker-id)]
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 71d4654..a53ff82 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -15,14 +15,22 @@
;; limitations under the License.
(ns backtype.storm.daemon.nimbus
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+ (:import [backtype.storm.generated KeyNotFoundException])
+ (:import [backtype.storm.blobstore LocalFsBlobStore])
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [org.apache.thrift.exception])
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [org.apache.commons.io FileUtils])
+ (:import [javax.security.auth Subject])
+ (:import [backtype.storm.security.auth NimbusPrincipal])
(:import [java.nio ByteBuffer]
[java.util Collections List HashMap]
[backtype.storm.generated NimbusSummary])
- (:import [java.io FileNotFoundException File FileOutputStream])
+ (:import [java.nio ByteBuffer]
+ [java.util Collections List HashMap ArrayList Iterator])
+ (:import [backtype.storm.blobstore AtomicOutputStream BlobStoreAclHandler
+ InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer])
+ (:import [java.io File FileOutputStream FileInputStream])
(:import [java.net InetAddress])
(:import [java.nio.channels Channels WritableByteChannel])
(:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
@@ -31,12 +39,12 @@
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
(:import [backtype.storm.nimbus NimbusInfo])
(:import [backtype.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ThriftTopologyUtils
- BufferFileInputStream])
+ BufferFileInputStream BufferInputStream])
(:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo
- ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice
- ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
+ ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
+ BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
ProfileRequest ProfileAction NodeInfo])
(:import [backtype.storm.daemon Shutdownable])
(:use [backtype.storm util config log timer zookeeper local-state])
@@ -47,6 +55,7 @@
(:require [clojure.set :as set])
(:import [backtype.storm.daemon.common StormBase Assignment])
(:use [backtype.storm.daemon common])
+ (:use [backtype.storm config])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
(:import [backtype.storm.utils VersionInfo])
(:require [clj-time.core :as time])
@@ -112,8 +121,7 @@
scheduler
))
-(defmulti mk-code-distributor cluster-mode)
-(defmulti sync-code cluster-mode)
+(defmulti blob-sync cluster-mode)
(defnk is-leader [nimbus :throw-exception true]
(let [leader-elector (:leader-elector nimbus)]
@@ -126,6 +134,25 @@
[(first ZooDefs$Ids/CREATOR_ALL_ACL)
(ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
+(defn mk-blob-cache-map
+ "Constructs a TimeCacheMap instance with a blob store timeout whose
+ expiration callback invokes cancel on the value held by an expired entry when
+ that value is an AtomicOutputStream and calls close otherwise."
+ [conf]
+ (TimeCacheMap.
+ (int (conf NIMBUS-BLOBSTORE-EXPIRATION-SECS))
+ (reify TimeCacheMap$ExpiredCallback
+ (expire [this id stream]
+ (if (instance? AtomicOutputStream stream)
+ (.cancel stream)
+ (.close stream))))))
+
+(defn mk-bloblist-cache-map
+ "Constructs a TimeCacheMap instance with a blobstore timeout and no callback
+ function."
+ [conf]
+ (TimeCacheMap. (int (conf NIMBUS-BLOBSTORE-EXPIRATION-SECS))))
+
(defn create-tology-action-notifier [conf]
(when-not (clojure.string/blank? (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))
(let [instance (new-instance (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))]
@@ -153,6 +180,10 @@
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
+ :blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+ :blob-downloaders (mk-blob-cache-map conf)
+ :blob-uploaders (mk-blob-cache-map conf)
+ :blob-listers (mk-bloblist-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
@@ -161,7 +192,6 @@
))
:scheduler (mk-scheduler conf inimbus)
:leader-elector (zk-leader-elector conf)
- :code-distributor (mk-code-distributor conf)
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
@@ -175,22 +205,44 @@
(defn inbox [nimbus]
(master-inbox (:conf nimbus)))
-(defn- read-storm-conf [conf storm-id]
- (let [stormroot (master-stormdist-root conf storm-id)]
- (merge conf
- (clojurify-structure
- (Utils/fromCompressedJsonConf
- (FileUtils/readFileToByteArray
- (File. (master-stormconf-path stormroot))))))))
+(defn- get-subject []
+ (let [req (ReqContext/context)]
+ (.subject req)))
+
+(def user-subject
+ (get-subject))
+
+(defn- read-storm-conf [conf storm-id blob-store]
+ (clojurify-structure
+ (Utils/fromCompressedJsonConf
+ (.readBlob blob-store (master-stormconf-key storm-id) user-subject))))
(declare delay-event)
(declare mk-assignments)
+(defn get-nimbus-subject
+ []
+ (let [subject (Subject.)
+ principal (NimbusPrincipal.)
+ principals (.getPrincipals subject)]
+ (.add principals principal)
+ subject))
+
+(def nimbus-subject
+ (get-nimbus-subject))
+
+(defn- get-key-list-from-id
+ [conf id]
+ (log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)})
+ (if (local-mode? conf)
+ [(master-stormcode-key id) (master-stormconf-key id)]
+ [(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)]))
+
(defn kill-transition [nimbus storm-id]
(fn [kill-time]
(let [delay (if kill-time
kill-time
- (get (read-storm-conf (:conf nimbus) storm-id)
+ (get (read-storm-conf (:conf nimbus) storm-id (:blob-store nimbus))
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
@@ -205,7 +257,7 @@
(fn [time num-workers executor-overrides]
(let [delay (if time
time
- (get (read-storm-conf (:conf nimbus) storm-id)
+ (get (read-storm-conf (:conf nimbus) storm-id (:blob-store nimbus))
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
@@ -250,6 +302,10 @@
(log-message "Killing topology: " storm-id)
(.remove-storm! (:storm-cluster-state nimbus)
storm-id)
+ (when (instance? LocalFsBlobStore (:blob-store nimbus))
+ (doseq [blob-key (get-key-list-from-id (:conf nimbus) storm-id)]
+ (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+ (.remove-key-version! (:storm-cluster-state nimbus) blob-key)))
nil)
}
:rebalancing {:startup (fn [] (delay-event nimbus
@@ -391,53 +447,99 @@
[(.getNodeId slot) (.getPort slot)]
)))
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+ (let [version (KeySequenceNumber. key nimbus-host-port-info)]
+ (.getKeySequenceNumber version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+ (let [key-iter (.listKeys blob-store)]
+ (iterator-seq key-iter)))
+
(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology]
- (let [stormroot (master-stormdist-root conf storm-id)]
- (log-message "nimbus file location:" stormroot)
- (FileUtils/forceMkdir (File. stormroot))
- (FileUtils/cleanDirectory (File. stormroot))
- (setup-jar conf tmp-jar-location stormroot)
- (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
- (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf storm-conf))
- (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
- ))
+ (let [subject user-subject
+ storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ jar-key (master-stormjar-key storm-id)
+ code-key (master-stormcode-key storm-id)
+ conf-key (master-stormconf-key storm-id)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+ (when tmp-jar-location ;;in local mode there is no jar
+ (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf))))
+ (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf)))
+ (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf)))))
+
+(defn- read-storm-topology [storm-id blob-store]
+ (Utils/deserialize
+ (.readBlob blob-store (master-stormcode-key storm-id) user-subject) StormTopology))
+
+(defn get-blob-replication-count
+ [blob-key nimbus]
+ (if (:blob-store nimbus)
+ (-> (:blob-store nimbus)
+ (.getBlobReplication blob-key nimbus-subject))))
(defn- wait-for-desired-code-replication [nimbus conf storm-id]
(let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
- total-wait-time (atom 0)
- current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
- (if (:code-distributor nimbus)
- (while (and (> min-replication-count @current-replication-count)
- (or (= -1 max-replication-wait-time)
- (< @total-wait-time max-replication-wait-time)))
+ current-replication-count-jar (if (not (local-mode? conf))
+ (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus))
+ (atom min-replication-count))
+ current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+ current-replication-count-conf (atom (get-blob-replication-count (master-stormconf-key storm-id) nimbus))
+ total-wait-time (atom 0)]
+ (if (:blob-store nimbus)
+ (while (and
+ (or (> min-replication-count @current-replication-count-jar)
+ (> min-replication-count @current-replication-count-code)
+ (> min-replication-count @current-replication-count-conf))
+ (or (neg? max-replication-wait-time)
+ (< @total-wait-time max-replication-wait-time)))
(sleep-secs 1)
(log-debug "waiting for desired replication to be achieved.
min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time
- "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time)
+ (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar)
+ "current-replication-count for code key = " @current-replication-count-code
+ "current-replication-count for conf key = " @current-replication-count-conf
+ " total-wait-time " @total-wait-time)
(swap! total-wait-time inc)
- (reset! current-replication-count (.getReplicationCount (:code-distributor nimbus) storm-id))))
- (if (< min-replication-count @current-replication-count)
- (log-message "desired replication count " min-replication-count " achieved,
- current-replication-count" @current-replication-count)
- (log-message "desired replication count of " min-replication-count " not achieved but we have hit the max wait time "
- max-replication-wait-time " so moving on with replication count = " @current-replication-count)
- )))
-
-(defn- read-storm-topology [conf storm-id]
- (let [stormroot (master-stormdist-root conf storm-id)]
- (Utils/deserialize
- (FileUtils/readFileToByteArray
- (File. (master-stormcode-path stormroot))
- ) StormTopology)))
+ (if (not (local-mode? conf))
+ (reset! current-replication-count-conf (get-blob-replication-count (master-stormconf-key storm-id) nimbus)))
+ (reset! current-replication-count-code (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+ (reset! current-replication-count-jar (get-blob-replication-count (master-stormjar-key storm-id) nimbus))))
+ (if (and (< min-replication-count @current-replication-count-conf)
+ (< min-replication-count @current-replication-count-code)
+ (< min-replication-count @current-replication-count-jar))
+ (log-message "desired replication count of " min-replication-count " not achieved but we have hit the max wait time "
+ max-replication-wait-time " so moving on with replication count for conf key = " @current-replication-count-conf
+ " for code key = " @current-replication-count-code "for jar key = " @current-replication-count-jar)
+ (log-message "desired replication count " min-replication-count " achieved, "
+ "current-replication-count for conf key = " @current-replication-count-conf ", "
+ "current-replication-count for code key = " @current-replication-count-code ", "
+ "current-replication-count for jar key = " @current-replication-count-jar))))
+
+(defn- read-storm-topology-as-nimbus [storm-id blob-store]
+ (Utils/deserialize
+ (.readBlob blob-store (master-stormcode-key storm-id) nimbus-subject) StormTopology))
(declare compute-executor->component)
+(defn read-storm-conf-as-nimbus [storm-id blob-store]
+ (clojurify-structure
+ (Utils/fromCompressedJsonConf
+ (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
+
(defn read-topology-details [nimbus storm-id]
(let [conf (:conf nimbus)
+ blob-store (:blob-store nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
- topology-conf (read-storm-conf conf storm-id)
- topology (read-storm-topology conf storm-id)
+ topology-conf (read-storm-conf-as-nimbus storm-id blob-store)
+ topology (read-storm-topology-as-nimbus storm-id blob-store)
executor->component (->> (compute-executor->component nimbus storm-id)
(map-key (fn [[start-task end-task]]
(ExecutorDetails. (int start-task) (int end-task)))))]
@@ -530,10 +632,11 @@
(defn- compute-executors [nimbus storm-id]
(let [conf (:conf nimbus)
+ blob-store (:blob-store nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
component->executors (:component->executors storm-base)
- storm-conf (read-storm-conf conf storm-id)
- topology (read-storm-topology conf storm-id)
+ storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
+ topology (read-storm-topology-as-nimbus storm-id blob-store)
task->component (storm-task-info topology storm-conf)]
(->> (storm-task-info topology storm-conf)
reverse-map
@@ -546,9 +649,10 @@
(defn- compute-executor->component [nimbus storm-id]
(let [conf (:conf nimbus)
+ blob-store (:blob-store nimbus)
executors (compute-executors nimbus storm-id)
- topology (read-storm-topology conf storm-id)
- storm-conf (read-storm-conf conf storm-id)
+ topology (read-storm-topology-as-nimbus storm-id blob-store)
+ storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
task->component (storm-task-info topology storm-conf)
executor->component (into {} (for [executor executors
:let [start-task (first executor)
@@ -838,7 +942,7 @@
)))
worker->resources (get new-assigned-worker->resources topology-id)]]
{topology-id (Assignment.
- (master-stormdist-root conf topology-id)
+ (conf STORM-LOCAL-DIR)
(select-keys all-node->host all-nodes)
executor->node+port
start-times
@@ -875,8 +979,9 @@
{:pre [(#{:active :inactive} topology-initial-status)]}
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
- storm-conf (read-storm-conf conf storm-id)
- topology (system-topology! storm-conf (read-storm-topology conf storm-id))
+ blob-store (:blob-store nimbus)
+ storm-conf (read-storm-conf conf storm-id blob-store)
+ topology (system-topology! storm-conf (read-storm-topology storm-id blob-store))
num-executors (->> (all-components topology) (map-val num-start-executors))]
(log-message "Activating " storm-name ": " storm-id)
(.activate-storm! storm-cluster-state
@@ -935,17 +1040,15 @@
([nimbus storm-name storm-conf operation]
(check-authorization! nimbus storm-name storm-conf operation (ReqContext/context))))
-(defn code-ids [conf]
- (-> conf
- master-stormdist-root
- read-dir-contents
- set
- ))
+(defn code-ids [blob-store]
+ (let [to-id (reify KeyFilter
+ (filter [this key] (get-id-from-blob-key key)))]
+ (set (.filterAndListKeys blob-store to-id))))
-(defn cleanup-storm-ids [conf storm-cluster-state]
+(defn cleanup-storm-ids [conf storm-cluster-state blob-store]
(let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
error-ids (set (.error-topologies storm-cluster-state))
- code-ids (code-ids conf)
+ code-ids (code-ids blob-store)
assigned-ids (set (.active-storms storm-cluster-state))]
(set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
))
@@ -1006,22 +1109,35 @@
TOPOLOGY-EVENTLOGGER-EXECUTORS (total-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)
TOPOLOGY-MAX-TASK-PARALLELISM (total-conf TOPOLOGY-MAX-TASK-PARALLELISM)})))
+(defn blob-rm-key [blob-store key storm-cluster-state]
+ (try
+ (.deleteBlob blob-store key nimbus-subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.remove-blobstore-key! storm-cluster-state key))
+ (catch Exception e
+ (log-message "Exception" e))))
+
+(defn blob-rm-topology-keys [id blob-store storm-cluster-state]
+ (blob-rm-key blob-store (master-stormjar-key id) storm-cluster-state)
+ (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
+ (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
+
(defn do-cleanup [nimbus]
(if (is-leader nimbus :throw-exception false)
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
- submit-lock (:submit-lock nimbus)]
+ submit-lock (:submit-lock nimbus)
+ blob-store (:blob-store nimbus)]
(let [to-cleanup-ids (locking submit-lock
- (cleanup-storm-ids conf storm-cluster-state))]
+ (cleanup-storm-ids conf storm-cluster-state blob-store))]
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
- (if (:code-distributor nimbus) (.cleanup (:code-distributor nimbus) id))
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
- (swap! (:heartbeats-cache nimbus) dissoc id))
- )))
+ (blob-rm-topology-keys id blob-store storm-cluster-state)
+ (swap! (:heartbeats-cache nimbus) dissoc id)))))
(log-message "not a leader, skipping cleanup")))
(defn- file-older-than? [now seconds file]
@@ -1036,8 +1152,7 @@
(if (.delete f)
(log-message "Cleaning inbox ... deleted: " (.getName f))
;; This should never happen
- (log-error "Cleaning inbox ... error deleting: " (.getName f))
- ))))
+ (log-error "Cleaning inbox ... error deleting: " (.getName f))))))
(defn clean-topology-history
"Deletes topologies from history older than minutes."
@@ -1051,25 +1166,34 @@
(ls-topo-hist! topo-history-state new-history))))
(defn cleanup-corrupt-topologies! [nimbus]
- (if (is-leader nimbus :throw-exception false)
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- code-ids (set (code-ids (:conf nimbus)))
- active-topologies (set (.active-storms storm-cluster-state))
- corrupt-topologies (set/difference active-topologies code-ids)]
- (doseq [corrupt corrupt-topologies]
- (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
- (.remove-storm! storm-cluster-state corrupt)
- )))
- (log-message "not a leader, skipping cleanup-corrupt-topologies"))
-
-;;setsup code distributor entries for all current topologies for which code is available locally.
-(defn setup-code-distributor [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
- locally-available-storm-ids (set (code-ids (:conf nimbus)))
+ blob-store (:blob-store nimbus)
+ code-ids (set (code-ids blob-store))
active-topologies (set (.active-storms storm-cluster-state))
- locally-available-active-storm-ids (set/intersection locally-available-storm-ids active-topologies)]
- (doseq [storm-id locally-available-active-storm-ids]
- (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))))
+ corrupt-topologies (set/difference active-topologies code-ids)]
+ (doseq [corrupt corrupt-topologies]
+ (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
+ (.remove-storm! storm-cluster-state corrupt)
+ (if (instance? LocalFsBlobStore blob-store)
+ (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
+ (.remove-blobstore-key! storm-cluster-state blob-key))))))
+
+(defn setup-blobstore [nimbus]
+ "Sets up blobstore state for all current keys."
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ local-set-of-keys (set (get-key-seq-from-blob-store blob-store))
+ all-keys (set (.active-keys storm-cluster-state))
+ locally-available-active-keys (set/intersection local-set-of-keys all-keys)
+ keys-to-delete (set/difference local-set-of-keys all-keys)
+ conf (:conf nimbus)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+ (log-debug "Deleting keys not on the zookeeper" keys-to-delete)
+ (doseq [key keys-to-delete]
+ (.deleteBlob blob-store key nimbus-subject))
+ (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys)
+ (doseq [key locally-available-active-keys]
+ (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf)))))
(defn- get-errors [storm-cluster-state storm-id component-id]
(->> (.errors storm-cluster-state storm-id component-id)
@@ -1102,26 +1226,26 @@
(catch Exception e
(throw (AuthorizationException. (str "Invalid file path: " file-path))))))
-(defn try-read-storm-conf [conf storm-id]
+(defn try-read-storm-conf
+ [conf storm-id blob-store]
(try-cause
- (read-storm-conf conf storm-id)
- (catch FileNotFoundException e
- (throw (NotAliveException. (str storm-id))))
- )
-)
+ (read-storm-conf-as-nimbus storm-id blob-store)
+ (catch KeyNotFoundException e
+ (throw (NotAliveException. (str storm-id))))))
-(defn try-read-storm-conf-from-name [conf storm-name nimbus]
+(defn try-read-storm-conf-from-name
+ [conf storm-name nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
id (get-storm-id storm-cluster-state storm-name)]
- (try-read-storm-conf conf id)))
+ (try-read-storm-conf conf id blob-store)))
-(defn try-read-storm-topology [conf storm-id]
+(defn try-read-storm-topology
+ [storm-id blob-store]
(try-cause
- (read-storm-topology conf storm-id)
- (catch FileNotFoundException e
- (throw (NotAliveException. (str storm-id))))
- )
-)
+ (read-storm-topology-as-nimbus storm-id blob-store)
+ (catch KeyNotFoundException e
+ (throw (NotAliveException. (str storm-id))))))
(defn add-topology-to-history-log
[storm-id nimbus topology-conf]
@@ -1166,6 +1290,7 @@
(defn renew-credentials [nimbus]
(if (is-leader nimbus :throw-exception false)
(let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
renewers (:cred-renewers nimbus)
update-lock (:cred-update-lock nimbus)
assigned-ids (set (.active-storms storm-cluster-state))]
@@ -1173,7 +1298,7 @@
(doseq [id assigned-ids]
(locking update-lock
(let [orig-creds (.credentials storm-cluster-state id nil)
- topology-conf (try-read-storm-conf (:conf nimbus) id)]
+ topology-conf (try-read-storm-conf (:conf nimbus) id blob-store)]
(if orig-creds
(let [new-creds (HashMap. orig-creds)]
(doseq [renewer renewers]
@@ -1210,22 +1335,40 @@
(.set_reset_log_level_timeout_epoch log-config (coerce/to-long timeout))
(.unset_reset_log_level_timeout_epoch log-config))))
+(defmethod blob-sync :distributed [conf nimbus]
+ (if (not (is-leader nimbus :throw-exception false))
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)
+ blob-store-key-set (set (get-key-seq-from-blob-store (:blob-store nimbus)))
+ zk-key-set (set (.blobstore storm-cluster-state (fn [] (blob-sync conf nimbus))))]
+ (log-debug "blob-sync " "blob-store-keys " blob-store-key-set "zookeeper-keys " zk-key-set)
+ (let [sync-blobs (doto
+ (BlobSynchronizer. (:blob-store nimbus) conf)
+ (.setNimbusInfo nimbus-host-port-info)
+ (.setBlobStoreKeySet blob-store-key-set)
+ (.setZookeeperKeySet zk-key-set))]
+ (.syncBlobs sync-blobs)))))
+
+(defmethod blob-sync :local [conf nimbus]
+ nil)
+
(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)
+ blob-store (:blob-store nimbus)
principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
admin-users (or (.get conf NIMBUS-ADMINS) [])
get-common-topo-info
(fn [^String storm-id operation]
(let [storm-cluster-state (:storm-cluster-state nimbus)
- topology-conf (try-read-storm-conf conf storm-id)
+ topology-conf (try-read-storm-conf conf storm-id blob-store)
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus
storm-name
topology-conf
operation)
- topology (try-read-storm-topology conf storm-id)
+ topology (try-read-storm-topology storm-id blob-store)
task->component (storm-task-info topology topology-conf)
base (.storm-base storm-cluster-state storm-id nil)
launch-time-secs (if base (:launch-time-secs base)
@@ -1264,10 +1407,11 @@
(.addToLeaderLockQueue (:leader-elector nimbus))
(cleanup-corrupt-topologies! nimbus)
- (setup-code-distributor nimbus)
+ (when (instance? LocalFsBlobStore blob-store)
+ ;register call back for blob-store
+ (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
+ (setup-blobstore nimbus))
- ;register call back for code-distributor
- (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
(when (is-leader nimbus :throw-exception false)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup)))
@@ -1278,31 +1422,27 @@
(when-not (conf NIMBUS-DO-NOT-REASSIGN)
(locking (:submit-lock nimbus)
(mk-assignments nimbus)))
- (do-cleanup nimbus)
- ))
+ (do-cleanup nimbus)))
;; Schedule Nimbus inbox cleaner
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
(fn []
- (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
- ))
+ (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+ ;; Schedule nimbus code sync thread to sync code from other nimbuses.
+ (if (instance? LocalFsBlobStore blob-store)
+ (schedule-recurring (:timer nimbus)
+ 0
+ (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+ (fn []
+ (blob-sync conf nimbus))))
;; Schedule topology history cleaner
(when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(schedule-recurring (:timer nimbus)
0
(conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
(fn []
- (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)
- )))
- ;;schedule nimbus code sync thread to sync code from other nimbuses.
- (schedule-recurring (:timer nimbus)
- 0
- (conf NIMBUS-CODE-SYNC-FREQ-SECS)
- (fn []
- (sync-code conf nimbus)
- ))
-
+ (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
@@ -1349,10 +1489,11 @@
principal (.principal req)
submitter-principal (if principal (.toString principal))
submitter-user (.toLocal principal-to-local principal)
+ system-user (System/getProperty "user.name")
topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
storm-conf (-> storm-conf-submitted
(assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal ""))
- (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user "")) ;Don't let the user set who we launch as
+ (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as
(assoc TOPOLOGY-USERS topo-acl)
(assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL)))
storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf)
@@ -1380,8 +1521,8 @@
(check-storm-active! nimbus storm-name false)
;;cred-update-lock is not needed here because creds are being added for the first time.
(.set-credentials! storm-cluster-state storm-id credentials storm-conf)
- (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology)
- (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))
+ (log-message "uploadedJar " uploadedJarLocation)
+ (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
(wait-for-desired-code-replication nimbus total-storm-conf storm-id)
(.setup-heartbeats! storm-cluster-state storm-id)
(.setup-backpressure! storm-cluster-state storm-id)
@@ -1456,7 +1597,7 @@
(mark! nimbus:num-debug-calls)
(let [storm-cluster-state (:storm-cluster-state nimbus)
storm-id (get-storm-id storm-cluster-state storm-name)
- topology-conf (try-read-storm-conf conf storm-id)
+ topology-conf (try-read-storm-conf conf storm-id blob-store)
;; make sure samplingPct is within bounds.
spct (Math/max (Math/min samplingPct 100.0) 0.0)
;; while disabling we retain the sampling pct.
@@ -1475,7 +1616,7 @@
(^void setWorkerProfiler
[this ^String id ^ProfileRequest profileRequest]
(mark! nimbus:num-setWorkerProfiler-calls)
- (let [topology-conf (try-read-storm-conf conf id)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
storm-cluster-state (:storm-cluster-state nimbus)]
@@ -1506,7 +1647,7 @@
(^void setLogConfig [this ^String id ^LogConfig log-config-msg]
(mark! nimbus:num-setLogConfig-calls)
- (let [topology-conf (try-read-storm-conf conf id)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
storm-cluster-state (:storm-cluster-state nimbus)
@@ -1534,7 +1675,7 @@
(mark! nimbus:num-uploadNewCredentials-calls)
(let [storm-cluster-state (:storm-cluster-state nimbus)
storm-id (get-storm-id storm-cluster-state storm-name)
- topology-conf (try-read-storm-conf conf storm-id)
+ topology-conf (try-read-storm-conf conf storm-id blob-store)
creds (when credentials (.get_creds credentials))]
(check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
(locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf))))
@@ -1607,7 +1748,7 @@
(^LogConfig getLogConfig [this ^String id]
(mark! nimbus:num-getLogConfig-calls)
- (let [topology-conf (try-read-storm-conf conf id)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
storm-cluster-state (:storm-cluster-state nimbus)
@@ -1616,24 +1757,24 @@
(^String getTopologyConf [this ^String id]
(mark! nimbus:num-getTopologyConf-calls)
- (let [topology-conf (try-read-storm-conf conf id)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)]
(check-authorization! nimbus storm-name topology-conf "getTopologyConf")
(to-json topology-conf)))
(^StormTopology getTopology [this ^String id]
(mark! nimbus:num-getTopology-calls)
- (let [topology-conf (try-read-storm-conf conf id)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)]
(check-authorization! nimbus storm-name topology-conf "getTopology")
(system-topology! topology-conf (try-read-storm-topology conf id))))
(^StormTopology getUserTopology [this ^String id]
(mark! nimbus:num-getUserTopology-calls)
- (let [topology-conf (try-read-storm-conf conf id)
+ (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)]
(check-authorization! nimbus storm-name topology-conf "getUserTopology")
- (try-read-storm-topology topology-conf id)))
+ (try-read-storm-topology id blob-store)))
(^ClusterSummary getClusterInfo [this]
(mark! nimbus:num-getClusterInfo-calls)
@@ -1668,42 +1809,39 @@
(.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
topology-summaries (dofor [[id base] bases :when base]
- (let [assignment (.assignment-info storm-cluster-state id nil)
- topo-summ (TopologySummary. id
- (:storm-name base)
- (->> (:executor->node+port assignment)
- keys
- (mapcat executor-id->tasks)
- count)
- (->> (:executor->node+port assignment)
- keys
- count)
- (->> (:executor->node+port assignment)
- vals
- set
- count)
- (time-delta (:launch-time-secs base))
- (extract-status-str base))]
- (when-let [owner (:owner base)] (.set_owner topo-summ owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
- (when-let [resources (.get @(:id->resources nimbus) id)]
- (.set_requested_memonheap topo-summ (get resources 0))
- (.set_requested_memoffheap topo-summ (get resources 1))
- (.set_requested_cpu topo-summ (get resources 2))
- (.set_assigned_memonheap topo-summ (get resources 3))
- (.set_assigned_memoffheap topo-summ (get resources 4))
- (.set_assigned_cpu topo-summ (get resources 5)))
- (.set_replication_count topo-summ (if (:code-distributor nimbus)
- (.getReplicationCount (:code-distributor nimbus) id)
- 1))
- topo-summ
- ))
+ (let [assignment (.assignment-info storm-cluster-state id nil)
+ topo-summ (TopologySummary. id
+ (:storm-name base)
+ (->> (:executor->node+port assignment)
+ keys
+ (mapcat executor-id->tasks)
+ count)
+ (->> (:executor->node+port assignment)
+ keys
+ count)
+ (->> (:executor->node+port assignment)
+ vals
+ set
+ count)
+ (time-delta (:launch-time-secs base))
+ (extract-status-str base))]
+ (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+ (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+ (when-let [resources (.get @(:id->resources nimbus) id)]
+ (.set_requested_memonheap topo-summ (get resources 0))
+ (.set_requested_memoffheap topo-summ (get resources 1))
+ (.set_requested_cpu topo-summ (get resources 2))
+ (.set_assigned_memonheap topo-summ (get resources 3))
+ (.set_assigned_memoffheap topo-summ (get resources 4))
+ (.set_assigned_cpu topo-summ (get resources 5)))
+ (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+ topo-summ))
ret (ClusterSummary. supervisor-summaries
topology-summaries
nimbuses)
_ (.set_nimbus_uptime_secs ret nimbus-uptime)]
ret))
-
+
(^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
(mark! nimbus:num-getTopologyInfoWithOpts-calls)
(let [{:keys [storm-name
@@ -1763,10 +1901,8 @@
(.set_assigned_cpu topo-info (get resources 5)))
(when-let [component->debug (:component->debug base)]
(.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
- (.set_replication_count topo-info (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 1))
-
- topo-info
- ))
+ (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
+ topo-info))
(^TopologyInfo getTopologyInfo [this ^String topology-id]
(mark! nimbus:num-getTopologyInfo-calls)
@@ -1774,6 +1910,157 @@
topology-id
(doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
+ (^String beginCreateBlob [this
+ ^String blob-key
+ ^SettableBlobMeta blob-meta]
+ (let [session-id (uuid)]
+ (.put (:blob-uploaders nimbus)
+ session-id
+ (.createBlob (:blob-store nimbus) blob-key blob-meta user-subject))
+ (log-message "Created blob for " blob-key
+ " with session id " session-id)
+ (str session-id)))
+
+ (^String beginUpdateBlob [this ^String blob-key]
+ (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus)
+ blob-key user-subject)]
+ (let [session-id (uuid)]
+ (.put (:blob-uploaders nimbus) session-id os)
+ (log-message "Created upload session for " blob-key
+ " with id " session-id)
+ (str session-id))))
+
+ (^void createStateInZookeeper [this ^String blob-key]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)
+ conf (:conf nimbus)]
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
+ (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
+
+ (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
+ (let [uploaders (:blob-uploaders nimbus)]
+ (if-let [^AtomicOutputStream os (.get uploaders session)]
+ (let [chunk-array (.array blob-chunk)
+ remaining (.remaining blob-chunk)
+ array-offset (.arrayOffset blob-chunk)
+ position (.position blob-chunk)]
+ (.write os chunk-array (+ array-offset position) remaining)
+ (.put uploaders session os))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)"))))
+
+ (^void finishBlobUpload [this ^String session]
+ (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+ (do
+ (.close os)
+ (log-message "Finished uploading blob for session "
+ session
+ ". Closing session.")
+ (.remove (:blob-uploaders nimbus) session))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)")))
+
+ (^void cancelBlobUpload [this ^String session]
+ (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)]
+ (do
+ (.cancel os)
+ (log-message "Canceled uploading blob for session "
+ session
+ ". Closing session.")
+ (.remove (:blob-uploaders nimbus) session))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)")))
+
+ (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
+ (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus)
+ blob-key user-subject)]
+ ret))
+
+ (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta]
+ (->> (ReqContext/context)
+ (.subject)
+ (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
+
+ (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
+ (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus)
+ blob-key user-subject)]
+ (let [session-id (uuid)
+ ret (BeginDownloadResult. (.getVersion is) (str session-id))]
+ (.set_data_size ret (.getFileLength is))
+ (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
+ (log-message "Created download session for " blob-key
+ " with id " session-id)
+ ret)))
+
+ (^ByteBuffer downloadBlobChunk [this ^String session]
+ (let [downloaders (:blob-downloaders nimbus)
+ ^BufferInputStream is (.get downloaders session)]
+ (when-not is
+ (throw (RuntimeException.
+ "Could not find input stream for session " session)))
+ (let [ret (.read is)]
+ (.put downloaders session is)
+ (when (empty? ret)
+ (.close is)
+ (.remove downloaders session))
+ (log-debug "Sending " (alength ret) " bytes")
+ (ByteBuffer/wrap ret))))
+
+ (^void deleteBlob [this ^String blob-key]
+ (let [subject (->> (ReqContext/context)
+ (.subject))]
+ (.deleteBlob (:blob-store nimbus) blob-key subject)
+ (when (instance? LocalFsBlobStore blob-store)
+ (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+ (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
+ (log-message "Deleted blob for key " blob-key)))
+
+ (^ListBlobsResult listBlobs [this ^String session]
+ (let [listers (:blob-listers nimbus)
+ ^Iterator keys-it (if (clojure.string/blank? session)
+ (.listKeys (:blob-store nimbus))
+ (.get listers session))
+ _ (or keys-it (throw-runtime "Blob list for session "
+ session
+ " does not exist (or timed out)"))
+
+ ;; Create a new session id if the user gave an empty session string.
+ ;; This is the use case when the user wishes to list blobs
+ ;; starting from the beginning.
+ session (if (clojure.string/blank? session)
+ (let [new-session (uuid)]
+ (log-message "Creating new session for downloading list " new-session)
+ new-session)
+ session)]
+ (if-not (.hasNext keys-it)
+ (do
+ (.remove listers session)
+ (log-message "No more blobs to list for session " session)
+ ;; A blank result communicates that there are no more blobs.
+ (ListBlobsResult. (ArrayList. 0) (str session)))
+ (let [^List list-chunk (->> keys-it
+ (iterator-seq)
+ (take 100) ;; Limit to next 100 keys
+ (ArrayList.))]
+ (log-message session " downloading " (.size list-chunk) " entries")
+ (.put listers session keys-it)
+ (ListBlobsResult. list-chunk (str session))))))
+
+ (^int getBlobReplication [this ^String blob-key]
+ (->> (ReqContext/context)
+ (.subject)
+ (.getBlobReplication (:blob-store nimbus) blob-key)))
+
+ (^int updateBlobReplication [this ^String blob-key ^int replication]
+ (->> (ReqContext/context)
+ (.subject)
+ (.updateBlobReplication (:blob-store nimbus) blob-key replication)))
+
(^TopologyPageInfo getTopologyPageInfo
[this ^String topo-id ^String window ^boolean include-sys?]
(mark! nimbus:num-getTopologyPageInfo-calls)
@@ -1807,9 +2094,8 @@
(.set_status (extract-status-str (:base info)))
(.set_uptime_secs (time-delta (:launch-time-secs info)))
(.set_topology_conf (to-json (try-read-storm-conf conf
- topo-id)))
- (.set_replication_count
- (.getReplicationCount (:code-distributor nimbus) topo-id)))
+ topo-id (:blob-store nimbus))))
+ (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus)))
(when-let [debug-options
(get-in info [:base :component->debug topo-id])]
(.set_debug_options
@@ -1889,60 +2175,14 @@
(.disconnect (:storm-cluster-state nimbus))
(.cleanup (:downloaders nimbus))
(.cleanup (:uploaders nimbus))
+ (.shutdown (:blob-store nimbus))
(.close (:leader-elector nimbus))
- (if (:code-distributor nimbus) (.close (:code-distributor nimbus) (:conf nimbus)))
(when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus)))
- (log-message "Shut down master")
- )
+ (log-message "Shut down master"))
DaemonCommon
(waiting? [this]
(timer-waiting? (:timer nimbus))))))
-(defmethod mk-code-distributor :distributed [conf]
- (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
- (.prepare code-distributor conf)
- code-distributor))
-
-(defmethod mk-code-distributor :local [conf]
- nil)
-
-(defn download-code [conf nimbus storm-id host port]
- (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid))
- storm-cluster-state (:storm-cluster-state nimbus)
- storm-root (master-stormdist-root conf storm-id)
- remote-meta-file-path (master-storm-metafile-path storm-root)
- local-meta-file-path (master-storm-metafile-path tmp-root)]
- (FileUtils/forceMkdir (File. tmp-root))
- (Utils/downloadFromHost conf remote-meta-file-path local-meta-file-path host port)
- (if (:code-distributor nimbus)
- (.download (:code-distributor nimbus) storm-id (File. local-meta-file-path)))
- (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root)))
- (FileUtils/moveDirectory (File. tmp-root) (File. storm-root))
- (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))))
-
-(defmethod sync-code :distributed [conf nimbus]
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- active-topologies (set (.code-distributor storm-cluster-state (fn [] (sync-code conf nimbus))))
- missing-topologies (set/difference active-topologies (set (code-ids (:conf nimbus))))]
- (if (not (empty? missing-topologies))
- (do
- (.removeFromLeaderLockQueue (:leader-elector nimbus))
- (doseq [missing missing-topologies]
- (log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.")
- (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)]
- (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing))
- (doseq [nimbus-host-port nimbuses-with-missing]
- (when-not (contains? (code-ids (:conf nimbus)) missing)
- (try
- (download-code conf nimbus missing (.getHost nimbus-host-port) (.getPort nimbus-host-port))
- (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing)))))))))
-
- (if (empty? (set/difference active-topologies (set (code-ids (:conf nimbus)))))
- (.addToLeaderLockQueue (:leader-elector nimbus)))))
-
-(defmethod sync-code :local [conf nimbus]
- nil)
-
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)