You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/01 20:11:56 UTC
[4/8] storm git commit: STORM-1276: line for line translation of
nimbus to java
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index ca60c02..60f5a1b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -16,7 +16,6 @@
(ns org.apache.storm.testing
(:require [org.apache.storm.daemon
- [nimbus :as nimbus]
[local-supervisor :as local-supervisor]
[common :as common]])
(:import [org.apache.commons.io FileUtils]
@@ -33,6 +32,7 @@
(:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState StormCommonInstaller])
(:import [org.apache.storm.tuple Fields Tuple TupleImpl])
(:import [org.apache.storm.task TopologyContext])
+ (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
(:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
(:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple
TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
@@ -49,9 +49,10 @@
(:import [org.apache.storm Config])
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.task TopologyContext]
- (org.apache.storm.messaging IContext)
+ [org.apache.storm.messaging IContext]
[org.json.simple JSONValue]
- (org.apache.storm.daemon StormCommon Acker DaemonCommon))
+ [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
+ [org.apache.storm.daemon StormCommon Acker DaemonCommon])
(:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
(:use [org.apache.storm util config log local-state-converter converter])
(:use [org.apache.storm.internal thrift]))
@@ -167,11 +168,60 @@
(let [val (atom (dec start-val))]
(fn [] (swap! val inc)))))
+(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
+ (let [leader-address (NimbusInfo. leader-name leader-port true)]
+ (reify ILeaderElector
+ (prepare [this conf] true)
+ (isLeader [this] is-leader)
+ (addToLeaderLockQueue [this] true)
+ (getLeader [this] leader-address)
+ (getAllNimbuses [this] `(leader-address))
+ (close [this] true))))
+
+(defn mk-nimbus
+ [conf inimbus blob-store leader-elector group-mapper cluster-state]
+ (Nimbus. conf inimbus cluster-state nil blob-store leader-elector group-mapper))
+
+(defnk mk-mocked-nimbus
+ [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil
+ :leader-elector nil :group-mapper nil :nimbus-daemon false :mk-nimbus mk-nimbus]
+ (let [zk-tmp (local-temp-path)
+ [zk-port zk-handle] (if-not cluster-state
+ (Zookeeper/mkInprocessZookeeper zk-tmp nil))
+ leader-elector (or leader-elector (if zk-handle leader-elector (mock-leader-elector)))
+ nimbus-tmp (local-temp-path)
+ daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+ ZMQ-LINGER-MILLIS 0
+ TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
+ TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
+ STORM-CLUSTER-MODE "local"
+ BLOBSTORE-SUPERUSER (System/getProperty "user.name")
+ BLOBSTORE-DIR nimbus-tmp}
+ (if-not cluster-state
+ {STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-SERVERS ["localhost"]})
+ daemon-conf)
+ nimbus (mk-nimbus
+ (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+ (if inimbus inimbus (Nimbus$StandaloneINimbus.))
+ blob-store
+ leader-elector
+ group-mapper
+ cluster-state)
+ _ (.launchServer nimbus)
+ nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)]
+ {:nimbus nimbus
+ :daemon-conf daemon-conf
+ :tmp-dirs (atom [nimbus-tmp zk-tmp])
+ :nimbus-thrift-server nimbus-thrift-server
+ :zookeeper (if (not-nil? zk-handle) zk-handle)}))
+
;; returns map containing cluster info
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
(Zookeeper/mkInprocessZookeeper zk-tmp nil))
@@ -189,9 +239,14 @@
STORM-ZOOKEEPER-SERVERS ["localhost"]})
daemon-conf)
port-counter (mk-counter supervisor-slot-port-min)
- nimbus (nimbus/service-handler
- (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
- (if inimbus inimbus (nimbus/standalone-nimbus)))
+ nimbus (mk-nimbus
+ (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+ (if inimbus inimbus (Nimbus$StandaloneINimbus.))
+ nil
+ nil
+ group-mapper
+ nil)
+ _ (.launchServer nimbus)
context (mk-shared-context daemon-conf)
nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
cluster-map {:nimbus nimbus
@@ -242,11 +297,12 @@
(.stop (:nimbus-thrift-server cluster-map))
(catch Exception e (log-message "failed to stop thrift")))
))
- (.close (:state cluster-map))
- (.disconnect (:storm-cluster-state cluster-map))
- (doseq [s @(:supervisors cluster-map)]
- (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
- (.close s))
+ (if (:state cluster-map) (.close (:state cluster-map)))
+ (if (:storm-cluster-state cluster-map) (.disconnect (:storm-cluster-state cluster-map)))
+ (if (:supervisors cluster-map)
+ (doseq [s @(:supervisors cluster-map)]
+ (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
+ (.close s)))
(ProcessSimulator/killAllProcesses)
(if (not-nil? (:zookeeper cluster-map))
(do
@@ -315,6 +371,21 @@
([cluster-map secs]
(advance-cluster-time cluster-map secs 1)))
+(defmacro with-mocked-nimbus
+ [[nimbus-sym & args] & body]
+ `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (log-error t# "Error in cluster")
+ (throw t#))
+ (finally
+ (let [keep-waiting?# (atom true)
+ f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
+ (kill-local-storm-cluster ~nimbus-sym)
+ (reset! keep-waiting?# false)
+ @f#)))))
+
(defmacro with-local-cluster
[[cluster-sym & args] & body]
`(let [~cluster-sym (mk-local-storm-cluster ~@args)]
@@ -356,48 +427,6 @@
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
(.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
-(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
- (fn [existing-assignments]
- (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
- existing-assignments (into {} (for [[tid assignment] existing-assignments]
- {tid (:worker->resources assignment)}))
- new-assignments (assoc existing-assignments topology-id worker->resources)]
- new-assignments)))
-
-(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
- (fn [new-scheduler-assignments existing-assignments]
- (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
- existing-assignments (into {} (for [[tid assignment] existing-assignments]
- {tid (:executor->node+port assignment)}))
- new-assignments (assoc existing-assignments topology-id executor->node+port)]
- new-assignments)))
-
-(defn mocked-compute-new-scheduler-assignments []
- (fn [nimbus existing-assignments topologies scratch-topology-id]
- existing-assignments))
-
-(defn submit-mocked-assignment
- [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
- (let [fake-common (proxy [StormCommon] []
- (stormTaskInfoImpl [_] task->component))]
- (with-open [- (StormCommonInstaller. fake-common)]
- (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
- nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
- storm-cluster-state
- storm-name
- worker->resources)
- nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
- storm-cluster-state
- storm-name
- executor->node+port)]
- (submit-local-topology nimbus storm-name conf topology)))))
-
-(defn find-worker-id
- [supervisor-conf port]
- (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
- worker->port (.getApprovedWorkers ^LocalState supervisor-state)]
- (first ((clojurify-structure (Utils/reverseMap worker->port)) port))))
-
(defn find-worker-port
[supervisor-conf worker-id]
(let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing4j.clj b/storm-core/src/clj/org/apache/storm/testing4j.clj
index b2acffc..41f8258 100644
--- a/storm-core/src/clj/org/apache/storm/testing4j.clj
+++ b/storm-core/src/clj/org/apache/storm/testing4j.clj
@@ -18,7 +18,6 @@
(:require [org.apache.storm [LocalCluster :as LocalCluster]])
(:import [org.apache.storm Config ILocalCluster LocalCluster])
(:import [org.apache.storm.generated StormTopology])
- (:import [org.apache.storm.daemon nimbus])
(:import [org.apache.storm.testing TestJob MockedSources TrackedTopology
MkClusterParam CompleteTopologyParam MkTupleParam])
(:import [org.apache.storm.utils Utils])
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 52e4059..5465b92 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -888,9 +888,9 @@
"stream" (.get_streamId s)
"executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
"processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas))
- "executed" (Utils/nullToZero (.get_executed bas))
- "acked" (Utils/nullToZero (.get_acked cas))
- "failed" (Utils/nullToZero (.get_failed cas))}))
+ "executed" (if-let [e (.get_executed bas)] e 0)
+ "acked" (if-let [a (.get_acked cas)] a 0)
+ "failed" (if-let [f (.get_failed cas)] f 0)}))
(defmulti unpack-comp-output-stat
(fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
@@ -899,8 +899,8 @@
[[stream-id ^ComponentAggregateStats stats]]
(let [^CommonAggregateStats cas (.get_common_stats stats)]
{"stream" stream-id
- "emitted" (Utils/nullToZero (.get_emitted cas))
- "transferred" (Utils/nullToZero (.get_transferred cas))}))
+ "emitted" (if-let [e (.get_emitted cas)] e 0)
+ "transferred" (if-let [t (.get_transferred cas)] t 0)}))
(defmethod unpack-comp-output-stat ComponentType/SPOUT
[[stream-id ^ComponentAggregateStats stats]]
@@ -908,11 +908,11 @@
^SpecificAggregateStats spec-s (.get_specific_stats stats)
^SpoutAggregateStats spout-s (.get_spout spec-s)]
{"stream" stream-id
- "emitted" (Utils/nullToZero (.get_emitted cas))
- "transferred" (Utils/nullToZero (.get_transferred cas))
+ "emitted" (if-let [e (.get_emitted cas)] e 0)
+ "transferred" (if-let [t (.get_transferred cas)] t 0)
"completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))
- "acked" (Utils/nullToZero (.get_acked cas))
- "failed" (Utils/nullToZero (.get_failed cas))}))
+ "acked" (if-let [a (.get_acked cas)] a 0)
+ "failed" (if-let [f (.get_failed cas)] f 0)}))
(defmulti unpack-comp-exec-stat
(fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
@@ -935,14 +935,14 @@
"uptimeSeconds" uptime
"host" host
"port" port
- "emitted" (Utils/nullToZero (.get_emitted cas))
- "transferred" (Utils/nullToZero (.get_transferred cas))
+ "emitted" (if-let [e (.get_emitted cas)] e 0)
+ "transferred" (if-let [t (.get_transferred cas)] t 0)
"capacity" (StatsUtil/floatStr (Utils/nullToZero (.get_capacity bas)))
"executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
- "executed" (Utils/nullToZero (.get_executed bas))
+ "executed" (if-let [e (.get_executed bas)] e 0)
"processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas))
- "acked" (Utils/nullToZero (.get_acked cas))
- "failed" (Utils/nullToZero (.get_failed cas))
+ "acked" (if-let [a (.get_acked cas)] a 0)
+ "failed" (if-let [f (.get_failed cas)] f 0)
"workerLogLink" (worker-log-link host port topology-id secure?)}))
(defmethod unpack-comp-exec-stat ComponentType/SPOUT
@@ -963,11 +963,11 @@
"uptimeSeconds" uptime
"host" host
"port" port
- "emitted" (Utils/nullToZero (.get_emitted cas))
- "transferred" (Utils/nullToZero (.get_transferred cas))
+ "emitted" (if-let [em (.get_emitted cas)] em 0)
+ "transferred" (if-let [t (.get_transferred cas)] t 0)
"completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms sas))
- "acked" (Utils/nullToZero (.get_acked cas))
- "failed" (Utils/nullToZero (.get_failed cas))
+ "acked" (if-let [ack (.get_acked cas)] ack 0)
+ "failed" (if-let [f (.get_failed cas)] f 0)
"workerLogLink" (worker-log-link host port topology-id secure?)}))
(defmulti unpack-component-page-info
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj
index 016fe55..bd76022 100644
--- a/storm-core/src/clj/org/apache/storm/util.clj
+++ b/storm-core/src/clj/org/apache/storm/util.clj
@@ -25,7 +25,7 @@
MutableObject])
(:import [org.apache.storm.security.auth NimbusPrincipal])
(:import [javax.security.auth Subject])
- (:import [java.util UUID Random ArrayList List Collections])
+ (:import [java.util UUID Random ArrayList List Collections Set])
(:import [java.util.zip ZipFile])
(:import [java.util.concurrent.locks ReentrantReadWriteLock])
(:import [java.util.concurrent Semaphore])
@@ -117,7 +117,9 @@
~@body
false
(catch Throwable t#
- (Utils/exceptionCauseIsInstanceOf ~klass t#))))
+ (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)]
+ (if (not tc#) (log-error t# "Exception did not match " ~klass))
+ tc#))))
(defmacro forcat
[[args aseq] & body]
@@ -145,14 +147,16 @@
(defn clojurify-structure
[s]
- (prewalk (fn [x]
+ (if s
+ (prewalk (fn [x]
(cond (instance? Map x) (into {} x)
(instance? List x) (vec x)
+ (instance? Set x) (into #{} x)
;; (Boolean. false) does not evaluate to false in an if.
;; This fixes that.
(instance? Boolean x) (boolean x)
true x))
- s))
+ s)))
; move this func form convert.clj due to cyclic load dependency
(defn clojurify-error [^ErrorInfo error]
(if error
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index de96d40..4f05261 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -18,6 +18,7 @@
package org.apache.storm;
import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -694,9 +695,9 @@ public class Config extends HashMap<String, Object> {
/**
* FQCN of a class that implements {@code I} @see org.apache.storm.nimbus.ITopologyActionNotifierPlugin for details.
*/
+ @isImplementationOfClass(implementsClass = ITopologyActionNotifierPlugin.class)
public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class";
- public static final Object NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN_SCHEMA = String.class;
-
+
/**
* Storm UI binds to this host/interface.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
index 14879b4..1c10c40 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -31,6 +31,8 @@ import java.util.regex.Pattern;
import javax.security.auth.Subject;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
/**
* Provides a way to store blobs that can be downloaded.
@@ -304,6 +307,41 @@ public abstract class BlobStore implements Shutdownable {
}
/**
+ * Helper method to read a stored topology
+ * @param topoId the id of the topology to read
+ * @param who who to read it as
+ * @return the deserialized topology.
+ * @throws IOException on any error while reading the blob.
+ * @throws AuthorizationException if who is not allowed to read the blob
+ * @throws KeyNotFoundException if the blob could not be found
+ */
+ public StormTopology readTopology(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
+ return Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), StormTopology.class);
+ }
+
+ /**
+ * Helper method to read a stored topology config
+ * @param topoId the id of the topology whose conf we are reading
+ * @param who who we are reading this as
+ * @return the deserialized config
+ * @throws KeyNotFoundException if the blob could not be found
+ * @throws AuthorizationException if who is not allowed to read the blob
+ * @throws IOException on any error while reading the blob.
+ */
+ public Map<String, Object> readTopologyConf(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
+ return Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), who));
+ }
+
+ private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);
+ /**
+ * @return a set of all of the topology ids with special data stored in the
+ * blob store.
+ */
+ public Set<String> storedTopoIds() {
+ return filterAndListKeys(TO_TOPO_ID);
+ }
+
+ /**
* Output stream implementation used for reading the
* metadata and data information.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index a6f07ed..443e471 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -17,12 +17,24 @@
*/
package org.apache.storm.cluster;
-import org.apache.storm.generated.*;
-import org.apache.storm.nimbus.NimbusInfo;
-
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ErrorInfo;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
public List<String> assignments(Runnable callback);
@@ -35,7 +47,7 @@ public interface IStormClusterState {
public List<String> blobstoreInfo(String blobKey);
- public List nimbuses();
+ public List<NimbusSummary> nimbuses();
public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
@@ -117,10 +129,62 @@ public interface IStormClusterState {
public ErrorInfo lastError(String stormId, String componentId);
- public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException;
+ public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
public Credentials credentials(String stormId, Runnable callback);
public void disconnect();
-
-}
+
+ /**
+ * @return All of the supervisors with the ID as the key
+ */
+ default Map<String, SupervisorInfo> allSupervisorInfo() {
+ return allSupervisorInfo(null);
+ }
+
+ /**
+ * @param callback be alerted if the list of supervisors change
+ * @return All of the supervisors with the ID as the key
+ */
+ default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) {
+ Map<String, SupervisorInfo> ret = new HashMap<>();
+ for (String id: supervisors(callback)) {
+ ret.put(id, supervisorInfo(id));
+ }
+ return ret;
+ }
+
+ /**
+ * Get a topology ID from the name of a topology
+ * @param topologyName the name of the topology to look for
+ * @return the id of the topology or null if it is not alive.
+ */
+ default Optional<String> getTopoId(final String topologyName) {
+ String ret = null;
+ for (String topoId: activeStorms()) {
+ String name = stormBase(topoId, null).get_name();
+ if (topologyName.equals(name)) {
+ ret = topoId;
+ break;
+ }
+ }
+ return Optional.ofNullable(ret);
+ }
+
+ default Map<String, Assignment> topologyAssignments() {
+ Map<String, Assignment> ret = new HashMap<>();
+ for (String topoId: assignments(null)) {
+ ret.put(topoId, assignmentInfo(topoId, null));
+ }
+ return ret;
+ }
+
+ default Map<String, StormBase> topologyBases() {
+ Map<String, StormBase> stormBases = new HashMap<>();
+ for (String topologyId : activeStorms()) {
+ StormBase base = stormBase(topologyId, null);
+ stormBases.put(topologyId, base);
+ }
+ return stormBases;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 2fbb6c2..971b426 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -194,7 +194,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public List nimbuses() {
+ public List<NimbusSummary> nimbuses() {
List<NimbusSummary> nimbusSummaries = new ArrayList<>();
List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
for (String nimbusId : nimbusIds) {
@@ -698,7 +698,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
+ public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException {
List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
String path = ClusterUtils.credentialsPath(stormId);
stateStorage.set_data(path, Utils.serialize(creds), aclList);
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 70a91f3..874b607 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -56,6 +56,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -94,31 +95,9 @@ public class StormCommon {
public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";
- @SuppressWarnings("unchecked")
+ @Deprecated
public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
- List<String> activeTopologys = stormClusterState.activeStorms();
- IPredicate pred = new IPredicate<String>() {
- @Override
- public boolean test(String obj) {
- String name = stormClusterState.stormBase(obj, null).get_name();
- return name.equals(topologyName);
- }
- };
- return Utils.findOne(pred, activeTopologys);
- }
-
- public static Map<String, StormBase> topologyBases(IStormClusterState stormClusterState) {
- return _instance.topologyBasesImpl(stormClusterState);
- }
-
- protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) {
- List<String> activeTopologys = stormClusterState.activeStorms();
- Map<String, StormBase> stormBases = new HashMap<>();
- for (String topologyId : activeTopologys) {
- StormBase base = stormClusterState.stormBase(topologyId, null);
- stormBases.put(topologyId, base);
- }
- return stormBases;
+ return stormClusterState.getTopoId(topologyName).get();
}
public static void validateDistributedMode(Map conf) {
@@ -181,13 +160,13 @@ public class StormCommon {
}
@SuppressWarnings("unchecked")
- public static Map componentConf(Object component) {
+ public static Map<String, Object> componentConf(Object component) {
try {
- Map<Object, Object> conf = new HashMap<>();
+ Map<String, Object> conf = new HashMap<>();
ComponentCommon common = getComponentCommon(component);
String jconf = common.get_json_conf();
if (jconf != null) {
- conf.putAll((Map<Object, Object>) JSONValue.parseWithException(jconf));
+ conf.putAll((Map<String, Object>) JSONValue.parseWithException(jconf));
}
return conf;
} catch (Exception e) {