You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/10 15:33:53 UTC
[06/10] storm git commit: 1. Merge remote-tracking branch
'upstream/master'
1. Merge remote-tracking branch 'upstream/master'
Conflicts:
storm-core/src/clj/org/apache/storm/daemon/common.clj
storm-core/src/clj/org/apache/storm/daemon/drpc.clj
storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
storm-core/src/clj/org/apache/storm/ui/core.clj
storm-core/test/clj/org/apache/storm/nimbus_test.clj
2. update according to review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ee4fa721
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ee4fa721
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ee4fa721
Branch: refs/heads/master
Commit: ee4fa721310cb901df5a9f82d346f9cfbc26ffa2
Parents: eca27bc b477939
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Mar 9 20:00:03 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Mar 9 20:00:03 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 10 +
.../travis/print-errors-from-test-reports.py | 4 +
.../src/clj/org/apache/storm/MockAutoCred.clj | 58 -----
.../src/clj/org/apache/storm/daemon/drpc.clj | 30 +--
.../clj/org/apache/storm/daemon/executor.clj | 20 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 26 +--
.../src/clj/org/apache/storm/daemon/nimbus.clj | 155 ++++++-------
.../clj/org/apache/storm/daemon/supervisor.clj | 27 ++-
.../src/clj/org/apache/storm/daemon/worker.clj | 2 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 86 ++++----
.../src/clj/org/apache/storm/ui/helpers.clj | 10 +-
.../storm/cluster/StormClusterStateImpl.java | 7 +-
.../org/apache/storm/daemon/StormCommon.java | 215 ++++++-------------
.../storm/daemon/metrics/MetricsUtils.java | 2 +-
.../jvm/org/apache/storm/drpc/DRPCSpout.java | 2 +
.../storm/metric/StormMetricsRegistry.java | 84 ++++++++
.../auth/AbstractSaslClientCallbackHandler.java | 76 +++++++
.../auth/AbstractSaslServerCallbackHandler.java | 94 ++++++++
.../auth/digest/ClientCallbackHandler.java | 60 +-----
.../auth/digest/ServerCallbackHandler.java | 61 +-----
.../auth/plain/PlainClientCallbackHandler.java | 31 +++
.../auth/plain/PlainSaslTransportPlugin.java | 71 ++++++
.../auth/plain/PlainServerCallbackHandler.java | 55 +++++
.../security/auth/plain/SaslPlainServer.java | 158 ++++++++++++++
.../jvm/org/apache/storm/utils/ConfigUtils.java | 10 +
.../test/clj/org/apache/storm/nimbus_test.clj | 10 +-
.../clj/org/apache/storm/supervisor_test.clj | 6 +
.../test/jvm/org/apache/storm/MockAutoCred.java | 75 +++++++
28 files changed, 953 insertions(+), 492 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 0f95e28,9ff93f8..f055d8c
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -31,7 -31,7 +31,7 @@@
(:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
(:import [com.lmax.disruptor InsufficientCapacityException])
(:import [org.apache.storm.serialization KryoTupleSerializer])
- (:import [org.apache.storm.daemon Shutdownable StormCommon])
- (:import [org.apache.storm.daemon Shutdownable])
++ (:import [org.apache.storm.daemon Shutdownable StormCommon Acker])
(:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
@@@ -536,17 -536,17 +536,17 @@@
(throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
(let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
(condp = stream-id
- StormCommon/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
- ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
-- spout-id tuple-finished-info time-delta id)
- StormCommon/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
- ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
++ Acker/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
++ spout-id tuple-finished-info time-delta id)
++ Acker/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
)))
;; TODO: on failure, emit tuple to failure stream
))))
receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)
- has-ackers? (clojurify-structure (StormCommon/hasAckers storm-conf))
- has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
- has-ackers? (has-ackers? storm-conf)
- has-eventloggers? (has-eventloggers? storm-conf)
++ has-ackers? (StormCommon/hasAckers storm-conf)
++ has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
emitted-count (MutableLong. 0)
empty-emit-streak (MutableLong. 0)
spout-transfer-fn (fn []
@@@ -587,7 -587,7 +587,7 @@@
:values (if debug? values nil)}
(if (sampler) (System/currentTimeMillis))])
(task/send-unanchored task-data
- StormCommon/ACKER_INIT_STREAM_ID
- ACKER-INIT-STREAM-ID
++ Acker/ACKER_INIT_STREAM_ID
[root-id (Utils/bitXorVals out-ids) task-id]))
(when message-id
(ack-spout-msg executor-data task-data message-id
@@@ -742,7 -742,7 +742,7 @@@
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)))))))
- has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
- has-eventloggers? (has-eventloggers? storm-conf)
++ has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
bolt-transfer-fn (fn []
;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
(while (not @(:storm-active-atom executor-data))
@@@ -803,7 -803,7 +803,7 @@@
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
- StormCommon/ACKER_ACK_STREAM_ID
- ACKER-ACK-STREAM-ID
++ Acker/ACKER_ACK_STREAM_ID
[root (bit-xor id ack-val)])))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
@@@ -818,7 -818,7 +818,7 @@@
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
- StormCommon/ACKER_FAIL_STREAM_ID
- ACKER-FAIL-STREAM-ID
++ Acker/ACKER_FAIL_STREAM_ID
[root]))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 673f15d,0af12a2..e6fd0a2
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -1636,9 -1639,9 +1635,9 @@@
(notify-topology-action-listener nimbus storm-name operation)))
(debug [this storm-name component-id enable? samplingPct]
- (mark! nimbus:num-debug-calls)
+ (.mark nimbus:num-debug-calls)
(let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
+ storm-id (StormCommon/getStormId storm-cluster-state storm-name)
topology-conf (try-read-storm-conf conf storm-id blob-store)
;; make sure samplingPct is within bounds.
spct (Math/max (Math/min samplingPct 100.0) 0.0)
@@@ -1715,9 -1718,9 +1714,9 @@@
(.setTopologyLogConfig storm-cluster-state id merged-log-config)))
(uploadNewCredentials [this storm-name credentials]
- (mark! nimbus:num-uploadNewCredentials-calls)
+ (.mark nimbus:num-uploadNewCredentials-calls)
(let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
+ storm-id (StormCommon/getStormId storm-cluster-state storm-name)
topology-conf (try-read-storm-conf conf storm-id blob-store)
creds (when credentials (.get_creds credentials))]
(check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
@@@ -1811,10 -1814,10 +1810,10 @@@
(let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)]
(check-authorization! nimbus storm-name topology-conf "getTopology")
- (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
+ (StormCommon/systemTopology topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
(^StormTopology getUserTopology [this ^String id]
- (mark! nimbus:num-getUserTopology-calls)
+ (.mark nimbus:num-getUserTopology-calls)
(let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)]
(check-authorization! nimbus storm-name topology-conf "getUserTopology")
@@@ -2181,21 -2184,22 +2180,22 @@@
comp-page-info
(converter/thriftify-debugoptions debug-options)))
;; Add the event logger details.
- (let [component->tasks (clojurify-structure (Utils/reverseMap (:task->component info)))
- eventlogger-tasks (sort (get component->tasks
- StormCommon/EVENTLOGGER_COMPONENT_ID))
- ;; Find the task the events from this component route to.
- task-index (mod (TupleUtils/listHashCode [component-id])
- (count eventlogger-tasks))
- task-id (nth eventlogger-tasks task-index)
- eventlogger-exec (first (filter (fn [[start stop]]
- (between? task-id start stop))
- (keys executor->host+port)))
- [host port] (get executor->host+port eventlogger-exec)]
- (if (and host port)
- (doto comp-page-info
- (.set_eventlog_host host)
- (.set_eventlog_port port))))
+ (let [component->tasks (clojurify-structure (Utils/reverseMap (:task->component info)))]
- (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID)
++ (if (contains? component->tasks StormCommon/EVENTLOGGER_COMPONENT_ID)
+ (let [eventlogger-tasks (sort (get component->tasks
- EVENTLOGGER-COMPONENT-ID))
++ StormCommon/EVENTLOGGER_COMPONENT_ID))
+ ;; Find the task the events from this component route to.
+ task-index (mod (TupleUtils/listHashCode [component-id])
+ (count eventlogger-tasks))
+ task-id (nth eventlogger-tasks task-index)
+ eventlogger-exec (first (filter (fn [[start stop]]
+ (between? task-id start stop))
+ (keys executor->host+port)))
+ [host port] (get executor->host+port eventlogger-exec)]
+ (if (and host port)
+ (doto comp-page-info
+ (.set_eventlog_host host)
+ (.set_eventlog_port port))))))
comp-page-info))
(^TopologyHistoryInfo getTopologyHistory [this ^String user]
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 20cf7f2,3220728..fd8f6c9
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -1318,11 -1333,12 +1332,12 @@@
[supervisor]
(log-message "Starting supervisor for storm version '" STORM-VERSION "'")
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
- (validate-distributed-mode! conf)
+ (StormCommon/validateDistributedMode conf)
(let [supervisor (mk-supervisor conf nil supervisor)]
(Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown supervisor)))
- (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
- (StormCommon/startMetricsReporters conf)))
+ (def supervisor:num-slots-used-gauge (StormMetricsRegistry/registerGauge "supervisor:num-slots-used-gauge"
+ #(count (my-worker-ids conf))))
+ (StormMetricsRegistry/startMetricsReporters conf)))
(defn standalone-supervisor []
(let [conf-atom (atom nil)
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index e1b0185,92ba807..6d115ce
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -254,9 -254,6 +254,9 @@@
(log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event")))))
+(defn executor->tasks [executor-id]
- clojurify-structure (StormCommon/executorIdToTasks executor-id))
++ (StormCommon/executorIdToTasks executor-id))
+
(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf state-store storm-cluster-state]
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/ui/core.clj
index d24fc14,e1ab71f..c1ea340
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@@ -23,9 -23,12 +23,10 @@@
(:use [hiccup core page-helpers])
(:use [org.apache.storm config util log stats converter])
(:use [org.apache.storm.ui helpers])
- (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
- ACKER-FAIL-STREAM-ID mk-authorization-handler]]])
(:import [org.apache.storm.utils Time]
[org.apache.storm.generated NimbusSummary]
- [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration])
+ [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration]
+ [org.apache.storm.metric StormMetricsRegistry])
(:use [clojure.string :only [blank? lower-case trim split]])
(:import [org.apache.storm.generated ExecutorSpecificStats
ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
@@@ -48,10 -51,8 +49,9 @@@
[compojure.handler :as handler]
[ring.util.response :as resp]
[org.apache.storm.internal [thrift :as thrift]])
- (:require [metrics.meters :refer [defmeter mark!]])
(:import [org.apache.commons.lang StringEscapeUtils])
- (:import [org.apache.logging.log4j Level])
+ (:import [org.apache.logging.log4j Level]
- (org.apache.storm.daemon StormCommon))
++ (org.apache.storm.daemon StormCommon Acker))
(:import [org.eclipse.jetty.server Server])
(:gen-class))
@@@ -114,9 -115,9 +114,9 @@@
(defn is-ack-stream
[stream]
(let [acker-streams
- [StormCommon/ACKER_INIT_STREAM_ID
- StormCommon/ACKER_ACK_STREAM_ID
- StormCommon/ACKER_FAIL_STREAM_ID]]
- [ACKER-INIT-STREAM-ID
- ACKER-ACK-STREAM-ID
- ACKER-FAIL-STREAM-ID]]
++ [Acker/ACKER_INIT_STREAM_ID
++ Acker/ACKER_ACK_STREAM_ID
++ Acker/ACKER_FAIL_STREAM_ID]]
(every? #(not= %1 stream) acker-streams)))
(defn spout-summary?
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index b586422,0000000..7c7b3c2
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@@ -1,604 -1,0 +1,523 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.Thrift;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.generated.*;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.metric.EventLoggerBolt;
+import org.apache.storm.metric.MetricsConsumerBolt;
+import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.testing.NonRichBoltTracker;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.IPredicate;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class StormCommon {
+ // A singleton instance allows us to mock delegated static methods in our
+ // tests by subclassing.
+ private static StormCommon _instance = new StormCommon();
+
+ /**
+ * Provide an instance of this class for delegates to use. To mock out
+ * delegated methods, provide an instance of a subclass that overrides the
+ * implementation of the delegated method.
+ * @param common a StormCommon instance
+ * @return the previously set instance
+ */
+ public static StormCommon setInstance(StormCommon common) {
+ StormCommon oldInstance = _instance;
+ _instance = common;
+ return oldInstance;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
+
- public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID;
- public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID;
- public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID;
- public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID;
-
+ public static final String SYSTEM_STREAM_ID = "__system";
+
+ public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
+ public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
+
- public static void startMetricsReporter(PreparableReporter report, Map conf) {
- report.prepare(new MetricRegistry(), conf);
- report.start();
- LOG.info("Started statistics report plugin...");
- }
-
- public static void startMetricsReporters(Map conf) {
- List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
- for (PreparableReporter reporter : reporters) {
- startMetricsReporter(reporter, conf);
- }
- }
-
- public static String getTopologyNameById(String topologyId) {
- String topologyName = null;
- try {
- topologyName = topologyIdToName(topologyId);
- } catch (InvalidTopologyException e) {
- LOG.error("Invalid topologyId=" + topologyId);
- }
- return topologyName;
- }
-
- /**
- * Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp
- *
- * @param topologyId
- * @return
- */
- public static String topologyIdToName(String topologyId) throws InvalidTopologyException {
- String ret = null;
- int index = topologyId.lastIndexOf('-');
- if (index != -1 && index > 2) {
- index = topologyId.lastIndexOf('-', index - 1);
- if (index != -1 && index > 0)
- ret = topologyId.substring(0, index);
- else
- throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
- } else
- throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
- return ret;
- }
-
- public static String getStormId(IStormClusterState stormClusterState, final String topologyName) {
++ public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
+ List<String> activeTopologys = stormClusterState.activeStorms();
+ IPredicate pred = new IPredicate<String>() {
+ @Override
+ public boolean test(String obj) {
- return obj != null ? getTopologyNameById(obj).equals(topologyName) : false;
++ String name = stormClusterState.stormBase(obj, null).get_name();
++ return name.equals(topologyName);
+ }
+ };
+ return Utils.findOne(pred, activeTopologys);
+ }
+
+ public static Map<String, StormBase> topologyBases(IStormClusterState stormClusterState) {
+ return _instance.topologyBasesImpl(stormClusterState);
+ }
+
+ protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) {
+ List<String> activeTopologys = stormClusterState.activeStorms();
+ Map<String, StormBase> stormBases = new HashMap<String, StormBase>();
- if (activeTopologys != null) {
- for (String topologyId : activeTopologys) {
- StormBase base = stormClusterState.stormBase(topologyId, null);
- if (base != null) {
- stormBases.put(topologyId, base);
- }
- }
++ for (String topologyId : activeTopologys) {
++ StormBase base = stormClusterState.stormBase(topologyId, null);
++ stormBases.put(topologyId, base);
+ }
+ return stormBases;
+ }
+
+ public static void validateDistributedMode(Map conf) {
+ if (ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in local mode!");
+ }
+ }
+
+ private static void validateIds(StormTopology topology) throws InvalidTopologyException {
+ List<String> componentIds = new ArrayList<String>();
+
+ for (StormTopology._Fields field : Thrift.getTopologyFields()) {
+ if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ Object value = topology.getFieldValue(field);
+ if (value != null) {
+ Map<String, Object> componentMap = (Map<String, Object>) value;
+ componentIds.addAll(componentMap.keySet());
+
+ for (String id : componentMap.keySet()) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is not a valid component id.");
+ }
+ }
+ for (Object componentObj : componentMap.values()) {
+ ComponentCommon common = getComponentCommon(componentObj);
+ Set<String> streamIds = common.get_streams().keySet();
+ for (String id : streamIds) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is not a valid stream id.");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ List<String> offending = Utils.getRepeat(componentIds);
+ if (offending.isEmpty() == false) {
+ throw new InvalidTopologyException("Duplicate component ids: " + offending);
+ }
+ }
+
+ private static boolean isEmptyInputs(ComponentCommon common) {
- if (common == null) {
- return true;
- } else if (common.get_inputs() == null) {
++ if (common.get_inputs() == null) {
+ return true;
+ } else {
+ return common.get_inputs().isEmpty();
+ }
+ }
+
+ public static Map<String, Object> allComponents(StormTopology topology) {
+ Map<String, Object> components = new HashMap<String, Object>();
+ List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
+ for (StormTopology._Fields field : topologyFields) {
+ if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ components.putAll(((Map) topology.getFieldValue(field)));
+ }
+ }
+ return components;
+ }
+
+ public static Map componentConf(Object component) {
+ Map<Object, Object> conf = new HashMap<Object, Object>();
+ ComponentCommon common = getComponentCommon(component);
- if (common != null) {
- String jconf = common.get_json_conf();
- if (jconf != null) {
- conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
- }
++ String jconf = common.get_json_conf();
++ if (jconf != null) {
++ conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
+ }
+ return conf;
+ }
+
+ public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
+ validateIds(topology);
+
- List<StormTopology._Fields> spoutFields = Arrays.asList(Thrift.getSpoutFields());
- for (StormTopology._Fields field : spoutFields) {
++ for (StormTopology._Fields field : Thrift.getSpoutFields()) {
+ Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field);
+ if (spoutComponents != null) {
+ for (Object obj : spoutComponents.values()) {
+ ComponentCommon common = getComponentCommon(obj);
+ if (isEmptyInputs(common) == false) {
+ throw new InvalidTopologyException("May not declare inputs for a spout");
+ }
+ }
+ }
+ }
+
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Object componentObj : componentMap.values()) {
+ Map conf = componentConf(componentObj);
+ ComponentCommon common = getComponentCommon(componentObj);
- if (common != null) {
- int parallelismHintNum = Thrift.getParallelismHint(common);
- Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
- if (taskNum > 0 && parallelismHintNum <= 0) {
- throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
- }
++ int parallelismHintNum = Thrift.getParallelismHint(common);
++ Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
++ if (taskNum > 0 && parallelismHintNum <= 0) {
++ throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
+ }
+ }
+ }
+
+ private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
+ Set<String> outputFields = new HashSet<String>();
- if (streams != null) {
- for (StreamInfo streamInfo : streams.values()) {
- outputFields.addAll(streamInfo.get_output_fields());
- }
++ for (StreamInfo streamInfo : streams.values()) {
++ outputFields.addAll(streamInfo.get_output_fields());
+ }
+ return outputFields;
+ }
+
+ public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
+ String componentId = entry.getKey();
+ ComponentCommon common = getComponentCommon(entry.getValue());
- if (common != null) {
- Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
- for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
- String sourceStreamId = input.getKey().get_streamId();
- String sourceComponentId = input.getKey().get_componentId();
- if(componentMap.keySet().contains(sourceComponentId) == false) {
- throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
- }
++ Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
++ for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
++ String sourceStreamId = input.getKey().get_streamId();
++ String sourceComponentId = input.getKey().get_componentId();
++ if(componentMap.keySet().contains(sourceComponentId) == false) {
++ throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
++ }
+
- ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
- if (sourceComponent == null || sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
- throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
- "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
- }
++ ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
++ if (sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
++ throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
++ "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
++ }
+
- Grouping grouping = input.getValue();
- if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
- List<String> fields = grouping.get_fields();
- Map<String, StreamInfo> streams = sourceComponent.get_streams();
- Set<String> sourceOutputFields = getStreamOutputFields(streams);
- if (sourceOutputFields.containsAll(fields) == false) {
- throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId +"] of component " +
- "[" + sourceComponentId + "] + with non-existent fields: " + fields);
- }
++ Grouping grouping = input.getValue();
++ if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
++ List<String> fields = new ArrayList<String>(grouping.get_fields());
++ Map<String, StreamInfo> streams = sourceComponent.get_streams();
++ Set<String> sourceOutputFields = getStreamOutputFields(streams);
++ fields.removeAll(sourceOutputFields);
++ if (fields.size() != 0) {
++ throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId +"] of component " +
++ "[" + sourceComponentId + "] + with non-existent fields: " + fields);
+ }
+ }
+ }
+ }
+ }
+
+ public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+ Set<String> boltIds = topology.get_bolts().keySet();
+ Set<String> spoutIds = topology.get_spouts().keySet();
+
+ for(String id : spoutIds) {
- inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
++ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+
+ for(String id : boltIds) {
- inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
- inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
++ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
++ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+ return inputs;
+ }
+
+ public static IBolt makeAckerBolt() {
+ return _instance.makeAckerBoltImpl();
+ }
+ public IBolt makeAckerBoltImpl() {
+ return new Acker();
+ }
+
+ public static void addAcker(Map conf, StormTopology topology) {
+ int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
+
+ Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
- outputStreams.put(ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
- outputStreams.put(ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
++ outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
++ outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
+
+ Map<String, Object> ackerConf = new HashMap<String, Object>();
+ ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
+ ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+
+ Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
+
+ for(Bolt bolt : topology.get_bolts().values()) {
+ ComponentCommon common = bolt.get_common();
- common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
- common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
++ common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
++ common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+ }
+
+ for (SpoutSpec spout : topology.get_spouts().values()) {
+ ComponentCommon common = spout.get_common();
+ Map spoutConf = componentConf(spout);
+ spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ common.set_json_conf(JSONValue.toJSONString(spoutConf));
- common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
- common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
- common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
++ common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
++ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
++ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
+ }
+
- topology.put_to_bolts(ACKER_COMPONENT_ID, acker);
++ topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
+ }
+
+ public static ComponentCommon getComponentCommon(Object component) {
- if (component == null) {
- return null;
- }
-
+ ComponentCommon common = null;
+ if (component instanceof StateSpoutSpec) {
+ common = ((StateSpoutSpec) component).get_common();
+ } else if (component instanceof SpoutSpec) {
+ common = ((SpoutSpec) component).get_common();
+ } else if (component instanceof Bolt) {
+ common = ((Bolt) component).get_common();
+ }
+ return common;
+ }
+
+ public static void addMetricStreams(StormTopology topology) {
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
- if (common != null) {
- StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
- common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
- }
++ StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
++ common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
+ }
+ }
+
+ public static void addSystemStreams(StormTopology topology) {
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
- if (common != null) {
- StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
- common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
- }
++ StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
++ common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
+ }
+ }
+
+ public static List<String> eventLoggerBoltFields() {
+ List<String> fields = Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
+ EventLoggerBolt.FIELD_VALUES);
+ return fields;
+ }
+
+ public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+ Set<String> allIds = new HashSet<String>();
+ allIds.addAll(topology.get_bolts().keySet());
+ allIds.addAll(topology.get_spouts().keySet());
+
+ for(String id : allIds) {
+ inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+ }
+ return inputs;
+ }
+
+ public static void addEventLogger(Map conf, StormTopology topology) {
+ Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ HashMap<String, Object> componentConf = new HashMap<String, Object>();
+ componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
+ componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
+
+ for(Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
- if (common != null) {
- common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
- }
++ common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
+ }
+ topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
+ }
+
+ public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
+ Map<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>();
+
+ Set<String> componentIdsEmitMetrics = new HashSet<String>();
+ componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
+ componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
+
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+ for (String componentId : componentIdsEmitMetrics) {
+ inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
+ }
+
+ List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
+ if (registerInfo != null) {
+ Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
+ for (Map<String, Object> info : registerInfo) {
+ String className = (String) info.get("class");
+ Object argument = info.get("argument");
+ Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
+ Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
+ metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+ Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
+
+ String id = className;
+ if (classOccurrencesMap.containsKey(className)) {
+ // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+ int occurrenceNum = classOccurrencesMap.get(className);
+ occurrenceNum++;
+ classOccurrencesMap.put(className, occurrenceNum);
+ id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
+ } else {
+ classOccurrencesMap.put(className, 1);
+ }
+ metricsConsumerBolts.put(id, metricsConsumerBolt);
+ }
+ }
+ return metricsConsumerBolts;
+ }
+
+ public static void addMetricComponents(Map conf, StormTopology topology) {
+ Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
+ for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
+ topology.put_to_bolts(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static void addSystemComponents(Map conf, StormTopology topology) {
+ Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+ outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
+ outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
+ outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
+
+ Map<String, Object> boltConf = new HashMap<String, Object>();
+ boltConf.put(Config.TOPOLOGY_TASKS, 0);
+
+ Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
+ topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec);
+ }
+
+ public static StormTopology systemTopology(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+ return _instance.systemTopologyImpl(stormConf, topology);
+ }
+
+ protected StormTopology systemTopologyImpl(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+ validateBasic(topology);
+
+ StormTopology ret = topology.deepCopy();
+ addAcker(stormConf, ret);
+ addEventLogger(stormConf, ret);
+ addMetricComponents(stormConf, ret);
+ addSystemComponents(stormConf, ret);
+ addMetricStreams(ret);
+ addSystemStreams(ret);
+
+ validateStructure(ret);
+
+ return ret;
+ }
+
+ public static boolean hasAckers(Map stormConf) {
+ Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+ if (ackerNum == null || Utils.getInt(ackerNum) > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public static boolean hasEventLoggers(Map stormConf) {
+ Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
+ if (eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public static int numStartExecutors(Object component) throws InvalidTopologyException {
+ ComponentCommon common = getComponentCommon(component);
- if (common == null) {
- throw new InvalidTopologyException("unknown component type " + component.getClass().getName());
- }
- int parallelismHintNum = Thrift.getParallelismHint(common);
- return parallelismHintNum;
++ return Thrift.getParallelismHint(common);
+ }
+
- public static Map stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
++ public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+ return _instance.stormTaskInfoImpl(userTopology, stormConf);
+ }
+ /*
+ * Returns map from task -> componentId
+ */
- protected Map stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
++ protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+ Map<Integer, String> taskIdToComponentId = new HashMap<Integer, String>();
+
+ StormTopology systemTopology = systemTopology(stormConf, userTopology);
+ Map<String, Object> components = allComponents(systemTopology);
+ Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
+ for (Map.Entry<String, Object> entry : components.entrySet()) {
+ Map conf = componentConf(entry.getValue());
+ Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
- if (taskNum != null) {
- componentIdToTaskNum.put(entry.getKey(), Utils.getInt(taskNum));
- }
++ componentIdToTaskNum.put(entry.getKey(), Utils.getInt(taskNum));
+ }
+
+ int taskId = 1;
+ for (Map.Entry<String, Integer> entry : componentIdToTaskNum.entrySet()) {
+ String componentId = entry.getKey();
+ Integer taskNum = entry.getValue();
+ while (taskNum > 0) {
+ taskIdToComponentId.put(taskId, componentId);
+ taskNum--;
+ taskId++;
+ }
+ }
+ return taskIdToComponentId;
+ }
+
+ public static List<Integer> executorIdToTasks(List<Long> executorId) {
+ List<Integer> taskIds = new ArrayList<Integer>();
+ int taskId = executorId.get(0).intValue();
+ while (taskId <= executorId.get(1).intValue()) {
+ taskIds.add(taskId);
+ taskId++;
+ }
+ return taskIds;
+ }
+
+ public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodeport) {
+ Map<Integer, NodeInfo> tasksToNodeport = new HashMap<Integer, NodeInfo>();
+ for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodeport.entrySet()) {
+ List<Integer> taskIds = executorIdToTasks(entry.getKey());
+ for (Integer taskId : taskIds) {
+ tasksToNodeport.put(taskId, entry.getValue());
+ }
+ }
+ return tasksToNodeport;
+ }
+
- public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) {
++ public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ return _instance.mkAuthorizationHandlerImpl(klassName, conf);
+ }
+
- protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) {
++ protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ IAuthorizer aznHandler = null;
- try {
- if (klassName != null) {
- Class aznClass = Class.forName(klassName);
- if (aznClass != null) {
- aznHandler = (IAuthorizer) aznClass.newInstance();
- if (aznHandler != null) {
- aznHandler.prepare(conf);
- }
- LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
++ if (klassName != null) {
++ Class aznClass = Class.forName(klassName);
++ if (aznClass != null) {
++ aznHandler = (IAuthorizer) aznClass.newInstance();
++ if (aznHandler != null) {
++ aznHandler.prepare(conf);
+ }
++ LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
+ }
- } catch (Exception e) {
- LOG.error("Failed to make authorization handler, klassName:{}", klassName);
+ }
+
+ return aznHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj
index b63ac1f,fb000da..c32480f
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@@ -22,10 -22,10 +22,10 @@@
TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
[org.apache.storm.nimbus InMemoryTopologyActionNotifier]
[org.apache.storm.generated GlobalStreamId]
- [org.apache.storm Thrift])
+ [org.apache.storm Thrift MockAutoCred])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper])
(:import [org.apache.storm.scheduler INimbus])
- (:import [org.mockito Mockito])
+ (:import [org.mockito Mockito Matchers])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
(:import [org.apache.storm.testing.staticmocking MockedCluster])
@@@ -36,14 -36,14 +36,14 @@@
(:import [java.util HashMap])
(:import [java.io File])
(:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate]
- [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
+ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller CommonInstaller])
(:import [org.apache.storm.zookeeper Zookeeper])
- (:import [org.apache.commons.io FileUtils]
- [org.json.simple JSONValue])
+ (:import [org.apache.commons.io FileUtils])
+ (:import [org.json.simple JSONValue])
+ (:import [org.apache.storm.daemon StormCommon])
(:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
- (:use [org.apache.storm testing MockAutoCred util config log converter])
+ (:use [org.apache.storm testing util config log converter])
- (:use [org.apache.storm.daemon common])
- (:require [conjure.core])
+ (:require [conjure.core] [org.apache.storm.daemon.worker :as worker])
(:use [conjure core]))
http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------