You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/10 15:33:53 UTC

[06/10] storm git commit: 1. Merge remote-tracking branch 'upstream/master'

1. Merge remote-tracking branch 'upstream/master'

Conflicts:
	storm-core/src/clj/org/apache/storm/daemon/common.clj
	storm-core/src/clj/org/apache/storm/daemon/drpc.clj
	storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
	storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
	storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
	storm-core/src/clj/org/apache/storm/ui/core.clj
	storm-core/test/clj/org/apache/storm/nimbus_test.clj

2. update according to review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ee4fa721
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ee4fa721
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ee4fa721

Branch: refs/heads/master
Commit: ee4fa721310cb901df5a9f82d346f9cfbc26ffa2
Parents: eca27bc b477939
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Mar 9 20:00:03 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Mar 9 20:00:03 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  10 +
 .../travis/print-errors-from-test-reports.py    |   4 +
 .../src/clj/org/apache/storm/MockAutoCred.clj   |  58 -----
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  30 +--
 .../clj/org/apache/storm/daemon/executor.clj    |  20 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  26 +--
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 155 ++++++-------
 .../clj/org/apache/storm/daemon/supervisor.clj  |  27 ++-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   2 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  86 ++++----
 .../src/clj/org/apache/storm/ui/helpers.clj     |  10 +-
 .../storm/cluster/StormClusterStateImpl.java    |   7 +-
 .../org/apache/storm/daemon/StormCommon.java    | 215 ++++++-------------
 .../storm/daemon/metrics/MetricsUtils.java      |   2 +-
 .../jvm/org/apache/storm/drpc/DRPCSpout.java    |   2 +
 .../storm/metric/StormMetricsRegistry.java      |  84 ++++++++
 .../auth/AbstractSaslClientCallbackHandler.java |  76 +++++++
 .../auth/AbstractSaslServerCallbackHandler.java |  94 ++++++++
 .../auth/digest/ClientCallbackHandler.java      |  60 +-----
 .../auth/digest/ServerCallbackHandler.java      |  61 +-----
 .../auth/plain/PlainClientCallbackHandler.java  |  31 +++
 .../auth/plain/PlainSaslTransportPlugin.java    |  71 ++++++
 .../auth/plain/PlainServerCallbackHandler.java  |  55 +++++
 .../security/auth/plain/SaslPlainServer.java    | 158 ++++++++++++++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  10 +
 .../test/clj/org/apache/storm/nimbus_test.clj   |  10 +-
 .../clj/org/apache/storm/supervisor_test.clj    |   6 +
 .../test/jvm/org/apache/storm/MockAutoCred.java |  75 +++++++
 28 files changed, 953 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 0f95e28,9ff93f8..f055d8c
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -31,7 -31,7 +31,7 @@@
    (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
    (:import [com.lmax.disruptor InsufficientCapacityException])
    (:import [org.apache.storm.serialization KryoTupleSerializer])
-   (:import [org.apache.storm.daemon Shutdownable StormCommon])
 -  (:import [org.apache.storm.daemon Shutdownable])
++  (:import [org.apache.storm.daemon Shutdownable StormCommon Acker])
    (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
    (:import [org.apache.storm Config Constants])
    (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
@@@ -536,17 -536,17 +536,17 @@@
                                      (throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
                                    (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
                                      (condp = stream-id
-                                       StormCommon/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
 -                                      ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
--                                                                         spout-id tuple-finished-info time-delta id)
-                                       StormCommon/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
 -                                      ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
++                                      Acker/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
++                                                                               spout-id tuple-finished-info time-delta id)
++                                      Acker/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
                                                                             spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
                                        )))
                                  ;; TODO: on failure, emit tuple to failure stream
                                  ))))
          receive-queue (:receive-queue executor-data)
          event-handler (mk-task-receiver executor-data tuple-action-fn)
-         has-ackers? (clojurify-structure (StormCommon/hasAckers storm-conf))
-         has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
 -        has-ackers? (has-ackers? storm-conf)
 -        has-eventloggers? (has-eventloggers? storm-conf)
++        has-ackers? (StormCommon/hasAckers storm-conf)
++        has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
          emitted-count (MutableLong. 0)
          empty-emit-streak (MutableLong. 0)
          spout-transfer-fn (fn []
@@@ -587,7 -587,7 +587,7 @@@
                                                                                           :values (if debug? values nil)}
                                                                                          (if (sampler) (System/currentTimeMillis))])
                                                                   (task/send-unanchored task-data
-                                                                                        StormCommon/ACKER_INIT_STREAM_ID
 -                                                                                       ACKER-INIT-STREAM-ID
++                                                                                       Acker/ACKER_INIT_STREAM_ID
                                                                                         [root-id (Utils/bitXorVals out-ids) task-id]))
                                                                 (when message-id
                                                                   (ack-spout-msg executor-data task-data message-id
@@@ -742,7 -742,7 +742,7 @@@
                                                                 (.getSourceComponent tuple)
                                                                 (.getSourceStreamId tuple)
                                                                 delta)))))))
-         has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
 -        has-eventloggers? (has-eventloggers? storm-conf)
++        has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
          bolt-transfer-fn (fn []
                             ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
                             (while (not @(:storm-active-atom executor-data))
@@@ -803,7 -803,7 +803,7 @@@
                                                    ack-val (.getAckVal tuple)]
                                                (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
                                                               (task/send-unanchored task-data
-                                                                                    StormCommon/ACKER_ACK_STREAM_ID
 -                                                                                   ACKER-ACK-STREAM-ID
++                                                                                   Acker/ACKER_ACK_STREAM_ID
                                                                                     [root (bit-xor id ack-val)])))
                                              (let [delta (tuple-time-delta! tuple)
                                                    debug? (= true (storm-conf TOPOLOGY-DEBUG))]
@@@ -818,7 -818,7 +818,7 @@@
                                            (^void fail [this ^Tuple tuple]
                                              (fast-list-iter [root (.. tuple getMessageId getAnchors)]
                                                              (task/send-unanchored task-data
-                                                                                   StormCommon/ACKER_FAIL_STREAM_ID
 -                                                                                  ACKER-FAIL-STREAM-ID
++                                                                                  Acker/ACKER_FAIL_STREAM_ID
                                                                                    [root]))
                                              (let [delta (tuple-time-delta! tuple)
                                                    debug? (= true (storm-conf TOPOLOGY-DEBUG))]

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 673f15d,0af12a2..e6fd0a2
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -1636,9 -1639,9 +1635,9 @@@
            (notify-topology-action-listener nimbus storm-name operation)))
  
        (debug [this storm-name component-id enable? samplingPct]
-         (mark! nimbus:num-debug-calls)
+         (.mark nimbus:num-debug-calls)
          (let [storm-cluster-state (:storm-cluster-state nimbus)
 -              storm-id (get-storm-id storm-cluster-state storm-name)
 +              storm-id (StormCommon/getStormId storm-cluster-state storm-name)
                topology-conf (try-read-storm-conf conf storm-id blob-store)
                ;; make sure samplingPct is within bounds.
                spct (Math/max (Math/min samplingPct 100.0) 0.0)
@@@ -1715,9 -1718,9 +1714,9 @@@
              (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
  
        (uploadNewCredentials [this storm-name credentials]
-         (mark! nimbus:num-uploadNewCredentials-calls)
+         (.mark nimbus:num-uploadNewCredentials-calls)
          (let [storm-cluster-state (:storm-cluster-state nimbus)
 -              storm-id (get-storm-id storm-cluster-state storm-name)
 +              storm-id (StormCommon/getStormId storm-cluster-state storm-name)
                topology-conf (try-read-storm-conf conf storm-id blob-store)
                creds (when credentials (.get_creds credentials))]
            (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
@@@ -1811,10 -1814,10 +1810,10 @@@
          (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
                storm-name (topology-conf TOPOLOGY-NAME)]
                (check-authorization! nimbus storm-name topology-conf "getTopology")
 -              (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
 +              (StormCommon/systemTopology topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
  
        (^StormTopology getUserTopology [this ^String id]
-         (mark! nimbus:num-getUserTopology-calls)
+         (.mark nimbus:num-getUserTopology-calls)
          (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
                storm-name (topology-conf TOPOLOGY-NAME)]
                (check-authorization! nimbus storm-name topology-conf "getUserTopology")
@@@ -2181,21 -2184,22 +2180,22 @@@
                comp-page-info
                (converter/thriftify-debugoptions debug-options)))
            ;; Add the event logger details.
-           (let [component->tasks (clojurify-structure (Utils/reverseMap (:task->component info)))
-                 eventlogger-tasks (sort (get component->tasks
-                                              StormCommon/EVENTLOGGER_COMPONENT_ID))
-                 ;; Find the task the events from this component route to.
-                 task-index (mod (TupleUtils/listHashCode [component-id])
-                                 (count eventlogger-tasks))
-                 task-id (nth eventlogger-tasks task-index)
-                 eventlogger-exec (first (filter (fn [[start stop]]
-                                                   (between? task-id start stop))
-                                                 (keys executor->host+port)))
-                 [host port] (get executor->host+port eventlogger-exec)]
-             (if (and host port)
-               (doto comp-page-info
-                 (.set_eventlog_host host)
-                 (.set_eventlog_port port))))
+           (let [component->tasks (clojurify-structure (Utils/reverseMap (:task->component info)))]
 -            (if (contains? component->tasks EVENTLOGGER-COMPONENT-ID)
++            (if (contains? component->tasks StormCommon/EVENTLOGGER_COMPONENT_ID)
+               (let [eventlogger-tasks (sort (get component->tasks
 -                                                 EVENTLOGGER-COMPONENT-ID))
++                                                 StormCommon/EVENTLOGGER_COMPONENT_ID))
+                     ;; Find the task the events from this component route to.
+                     task-index (mod (TupleUtils/listHashCode [component-id])
+                                     (count eventlogger-tasks))
+                     task-id (nth eventlogger-tasks task-index)
+                     eventlogger-exec (first (filter (fn [[start stop]]
+                                                       (between? task-id start stop))
+                                                     (keys executor->host+port)))
+                     [host port] (get executor->host+port eventlogger-exec)]
+                 (if (and host port)
+                   (doto comp-page-info
+                     (.set_eventlog_host host)
+                     (.set_eventlog_port port))))))
            comp-page-info))
  
        (^TopologyHistoryInfo getTopologyHistory [this ^String user]

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 20cf7f2,3220728..fd8f6c9
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -1318,11 -1333,12 +1332,12 @@@
    [supervisor]
    (log-message "Starting supervisor for storm version '" STORM-VERSION "'")
    (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
 -    (validate-distributed-mode! conf)
 +    (StormCommon/validateDistributedMode conf)
      (let [supervisor (mk-supervisor conf nil supervisor)]
        (Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown supervisor)))
-     (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
-     (StormCommon/startMetricsReporters conf)))
+     (def supervisor:num-slots-used-gauge (StormMetricsRegistry/registerGauge "supervisor:num-slots-used-gauge"
+                                            #(count (my-worker-ids conf))))
+     (StormMetricsRegistry/startMetricsReporters conf)))
  
  (defn standalone-supervisor []
    (let [conf-atom (atom nil)

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index e1b0185,92ba807..6d115ce
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -254,9 -254,6 +254,9 @@@
          (log-error e "Error when processing event")
          (Utils/exitProcess 20 "Error when processing an event")))))
  
 +(defn executor->tasks [executor-id]
-   clojurify-structure (StormCommon/executorIdToTasks executor-id))
++  (StormCommon/executorIdToTasks executor-id))
 +
  (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf state-store storm-cluster-state]
    (let [assignment-versions (atom {})
          executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/ui/core.clj
index d24fc14,e1ab71f..c1ea340
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@@ -23,9 -23,12 +23,10 @@@
    (:use [hiccup core page-helpers])
    (:use [org.apache.storm config util log stats converter])
    (:use [org.apache.storm.ui helpers])
 -  (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
 -                                              ACKER-FAIL-STREAM-ID mk-authorization-handler]]])
    (:import [org.apache.storm.utils Time]
             [org.apache.storm.generated NimbusSummary]
-            [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration])
+            [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration]
+            [org.apache.storm.metric StormMetricsRegistry])
    (:use [clojure.string :only [blank? lower-case trim split]])
    (:import [org.apache.storm.generated ExecutorSpecificStats
              ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
@@@ -48,10 -51,8 +49,9 @@@
              [compojure.handler :as handler]
              [ring.util.response :as resp]
              [org.apache.storm.internal [thrift :as thrift]])
-   (:require [metrics.meters :refer [defmeter mark!]])
    (:import [org.apache.commons.lang StringEscapeUtils])
 -  (:import [org.apache.logging.log4j Level])
 +  (:import [org.apache.logging.log4j Level]
-            (org.apache.storm.daemon StormCommon))
++           (org.apache.storm.daemon StormCommon Acker))
    (:import [org.eclipse.jetty.server Server])
    (:gen-class))
  
@@@ -114,9 -115,9 +114,9 @@@
  (defn is-ack-stream
    [stream]
    (let [acker-streams
-         [StormCommon/ACKER_INIT_STREAM_ID
-          StormCommon/ACKER_ACK_STREAM_ID
-          StormCommon/ACKER_FAIL_STREAM_ID]]
 -        [ACKER-INIT-STREAM-ID
 -         ACKER-ACK-STREAM-ID
 -         ACKER-FAIL-STREAM-ID]]
++        [Acker/ACKER_INIT_STREAM_ID
++         Acker/ACKER_ACK_STREAM_ID
++         Acker/ACKER_FAIL_STREAM_ID]]
      (every? #(not= %1 stream) acker-streams)))
  
  (defn spout-summary?

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index b586422,0000000..7c7b3c2
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@@ -1,604 -1,0 +1,523 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon;
 +
 +import com.codahale.metrics.MetricRegistry;
 +import org.apache.storm.Config;
 +import org.apache.storm.Constants;
 +import org.apache.storm.Thrift;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.daemon.metrics.MetricsUtils;
 +import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.generated.StormBase;
 +import org.apache.storm.metric.EventLoggerBolt;
 +import org.apache.storm.metric.MetricsConsumerBolt;
 +import org.apache.storm.metric.SystemBolt;
 +import org.apache.storm.security.auth.IAuthorizer;
 +import org.apache.storm.task.IBolt;
 +import org.apache.storm.testing.NonRichBoltTracker;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.IPredicate;
 +import org.apache.storm.utils.ThriftTopologyUtils;
 +import org.apache.storm.utils.Utils;
 +import org.json.simple.JSONValue;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.*;
 +
 +public class StormCommon {
 +    // A singleton instance allows us to mock delegated static methods in our
 +    // tests by subclassing.
 +    private static StormCommon _instance = new StormCommon();
 +
 +    /**
 +     * Provide an instance of this class for delegates to use.  To mock out
 +     * delegated methods, provide an instance of a subclass that overrides the
 +     * implementation of the delegated method.
 +     * @param common a StormCommon instance
 +     * @return the previously set instance
 +     */
 +    public static StormCommon setInstance(StormCommon common) {
 +        StormCommon oldInstance = _instance;
 +        _instance = common;
 +        return oldInstance;
 +    }
 +
 +    private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
 +
-     public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID;
-     public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID;
-     public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID;
-     public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID;
- 
 +    public static final String SYSTEM_STREAM_ID = "__system";
 +
 +    public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
 +    public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
 +
-     public static void startMetricsReporter(PreparableReporter report, Map conf) {
-         report.prepare(new MetricRegistry(), conf);
-         report.start();
-         LOG.info("Started statistics report plugin...");
-     }
- 
-     public static void startMetricsReporters(Map conf) {
-         List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
-         for (PreparableReporter reporter : reporters) {
-             startMetricsReporter(reporter, conf);
-         }
-     }
- 
-     public static String getTopologyNameById(String topologyId) {
-         String topologyName = null;
-         try {
-             topologyName = topologyIdToName(topologyId);
-         } catch (InvalidTopologyException e) {
-             LOG.error("Invalid topologyId=" + topologyId);
-         }
-         return topologyName;
-     }
- 
-     /**
-      * Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp
-      *
-      * @param topologyId
-      * @return
-      */
-     public static String topologyIdToName(String topologyId) throws InvalidTopologyException {
-         String ret = null;
-         int index = topologyId.lastIndexOf('-');
-         if (index != -1 && index > 2) {
-             index = topologyId.lastIndexOf('-', index - 1);
-             if (index != -1 && index > 0)
-                 ret = topologyId.substring(0, index);
-             else
-                 throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
-         } else
-             throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
-         return ret;
-     }
- 
-     public static String getStormId(IStormClusterState stormClusterState, final String topologyName) {
++    public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
 +        List<String> activeTopologys = stormClusterState.activeStorms();
 +        IPredicate pred = new IPredicate<String>() {
 +            @Override
 +            public boolean test(String obj) {
-                 return obj != null ? getTopologyNameById(obj).equals(topologyName) : false;
++                String name = stormClusterState.stormBase(obj, null).get_name();
++                return name.equals(topologyName);
 +            }
 +        };
 +        return Utils.findOne(pred, activeTopologys);
 +    }
 +
 +    public static Map<String, StormBase> topologyBases(IStormClusterState stormClusterState) {
 +        return _instance.topologyBasesImpl(stormClusterState);
 +    }
 +
 +    protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) {
 +        List<String> activeTopologys = stormClusterState.activeStorms();
 +        Map<String, StormBase> stormBases = new HashMap<String, StormBase>();
-         if (activeTopologys != null) {
-             for (String topologyId : activeTopologys) {
-                 StormBase base = stormClusterState.stormBase(topologyId, null);
-                 if (base != null) {
-                     stormBases.put(topologyId, base);
-                 }
-             }
++        for (String topologyId : activeTopologys) {
++            StormBase base = stormClusterState.stormBase(topologyId, null);
++            stormBases.put(topologyId, base);
 +        }
 +        return stormBases;
 +    }
 +
 +    public static void validateDistributedMode(Map conf) {
 +        if (ConfigUtils.isLocalMode(conf)) {
 +            throw new IllegalArgumentException("Cannot start server in local mode!");
 +        }
 +    }
 +
 +    private static void validateIds(StormTopology topology) throws InvalidTopologyException {
 +        List<String> componentIds = new ArrayList<String>();
 +
 +        for (StormTopology._Fields field : Thrift.getTopologyFields()) {
 +            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
 +                Object value = topology.getFieldValue(field);
 +                if (value != null) {
 +                    Map<String, Object> componentMap = (Map<String, Object>) value;
 +                    componentIds.addAll(componentMap.keySet());
 +
 +                    for (String id : componentMap.keySet()) {
 +                        if (Utils.isSystemId(id)) {
 +                            throw new InvalidTopologyException(id + " is not a valid component id.");
 +                        }
 +                    }
 +                    for (Object componentObj : componentMap.values()) {
 +                        ComponentCommon common = getComponentCommon(componentObj);
 +                        Set<String> streamIds = common.get_streams().keySet();
 +                        for (String id : streamIds) {
 +                            if (Utils.isSystemId(id)) {
 +                                throw new InvalidTopologyException(id + " is not a valid stream id.");
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +
 +        List<String> offending = Utils.getRepeat(componentIds);
 +        if (offending.isEmpty() == false) {
 +            throw new InvalidTopologyException("Duplicate component ids: " + offending);
 +        }
 +    }
 +
 +    private static boolean isEmptyInputs(ComponentCommon common) {
-         if (common == null) {
-             return true;
-         } else if (common.get_inputs() == null) {
++        if (common.get_inputs() == null) {
 +            return true;
 +        } else {
 +            return common.get_inputs().isEmpty();
 +        }
 +    }
 +
 +    public static Map<String, Object> allComponents(StormTopology topology) {
 +        Map<String, Object> components = new HashMap<String, Object>();
 +        List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
 +        for (StormTopology._Fields field : topologyFields) {
 +            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
 +                components.putAll(((Map) topology.getFieldValue(field)));
 +            }
 +        }
 +        return components;
 +    }
 +
 +    public static Map componentConf(Object component) {
 +        Map<Object, Object> conf = new HashMap<Object, Object>();
 +        ComponentCommon common = getComponentCommon(component);
-         if (common != null) {
-             String jconf = common.get_json_conf();
-             if (jconf != null) {
-                 conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
-             }
++        String jconf = common.get_json_conf();
++        if (jconf != null) {
++            conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
 +        }
 +        return conf;
 +    }
 +
 +    public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
 +        validateIds(topology);
 +
-         List<StormTopology._Fields> spoutFields = Arrays.asList(Thrift.getSpoutFields());
-         for (StormTopology._Fields field : spoutFields) {
++        for (StormTopology._Fields field : Thrift.getSpoutFields()) {
 +            Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field);
 +            if (spoutComponents != null) {
 +                for (Object obj : spoutComponents.values()) {
 +                    ComponentCommon common = getComponentCommon(obj);
 +                    if (isEmptyInputs(common) == false) {
 +                        throw new InvalidTopologyException("May not declare inputs for a spout");
 +                    }
 +                }
 +            }
 +        }
 +
 +        Map<String, Object> componentMap = allComponents(topology);
 +        for (Object componentObj : componentMap.values()) {
 +            Map conf = componentConf(componentObj);
 +            ComponentCommon common = getComponentCommon(componentObj);
-             if (common != null) {
-                 int parallelismHintNum = Thrift.getParallelismHint(common);
-                 Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
-                 if (taskNum > 0 && parallelismHintNum <= 0) {
-                     throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
-                 }
++            int parallelismHintNum = Thrift.getParallelismHint(common);
++            Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
++            if (taskNum > 0 && parallelismHintNum <= 0) {
++                throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
 +            }
 +        }
 +    }
 +
 +    private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
 +        Set<String> outputFields = new HashSet<String>();
-         if (streams != null) {
-             for (StreamInfo streamInfo : streams.values()) {
-                 outputFields.addAll(streamInfo.get_output_fields());
-             }
++        for (StreamInfo streamInfo : streams.values()) {
++            outputFields.addAll(streamInfo.get_output_fields());
 +        }
 +        return outputFields;
 +    }
 +
 +    public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
 +        Map<String, Object> componentMap = allComponents(topology);
 +        for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
 +            String componentId = entry.getKey();
 +            ComponentCommon common = getComponentCommon(entry.getValue());
-             if (common != null) {
-                 Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
-                 for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
-                     String sourceStreamId = input.getKey().get_streamId();
-                     String sourceComponentId = input.getKey().get_componentId();
-                     if(componentMap.keySet().contains(sourceComponentId) == false) {
-                         throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
-                     }
++            Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
++            for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
++                String sourceStreamId = input.getKey().get_streamId();
++                String sourceComponentId = input.getKey().get_componentId();
++                if(componentMap.keySet().contains(sourceComponentId) == false) {
++                    throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
++                }
 +
-                     ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
-                     if (sourceComponent == null || sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
-                         throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
-                                 "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
-                     }
++                ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
++                if (sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
++                    throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
++                            "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
++                }
 +
-                     Grouping grouping = input.getValue();
-                     if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
-                         List<String> fields = grouping.get_fields();
-                         Map<String, StreamInfo> streams = sourceComponent.get_streams();
-                         Set<String> sourceOutputFields = getStreamOutputFields(streams);
-                         if (sourceOutputFields.containsAll(fields) == false) {
-                             throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId  +"] of component " +
-                                     "[" + sourceComponentId + "] + with non-existent fields: " + fields);
-                         }
++                Grouping grouping = input.getValue();
++                if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
++                    List<String> fields = new ArrayList<String>(grouping.get_fields());
++                    Map<String, StreamInfo> streams = sourceComponent.get_streams();
++                    Set<String> sourceOutputFields = getStreamOutputFields(streams);
++                    fields.removeAll(sourceOutputFields);
++                    if (fields.size() != 0) {
++                        throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId  +"] of component " +
++                                "[" + sourceComponentId + "] + with non-existent fields: " + fields);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
 +        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
 +        Set<String> boltIds = topology.get_bolts().keySet();
 +        Set<String> spoutIds = topology.get_spouts().keySet();
 +
 +        for(String id : spoutIds) {
-             inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
++            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
 +        }
 +
 +        for(String id : boltIds) {
-             inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
-             inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
++            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
++            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
 +        }
 +        return inputs;
 +    }
 +
 +    public static IBolt makeAckerBolt() {
 +        return _instance.makeAckerBoltImpl();
 +    }
 +    public IBolt makeAckerBoltImpl() {
 +        return new Acker();
 +    }
 +
 +    public static void addAcker(Map conf, StormTopology topology) {
 +        int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
 +        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
 +
 +        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
-         outputStreams.put(ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
-         outputStreams.put(ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
++        outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
++        outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
 +
 +        Map<String, Object> ackerConf = new HashMap<String, Object>();
 +        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
 +        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
 +
 +        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
 +
 +        for(Bolt bolt : topology.get_bolts().values()) {
 +            ComponentCommon common = bolt.get_common();
-             common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
-             common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
++            common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
++            common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
 +        }
 +
 +        for (SpoutSpec spout : topology.get_spouts().values()) {
 +            ComponentCommon common = spout.get_common();
 +            Map spoutConf = componentConf(spout);
 +            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
 +            common.set_json_conf(JSONValue.toJSONString(spoutConf));
-             common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
-             common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
-             common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
++            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
++            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
++            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
 +        }
 +
-         topology.put_to_bolts(ACKER_COMPONENT_ID, acker);
++        topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
 +    }
 +
 +    public static ComponentCommon getComponentCommon(Object component) {
-         if (component == null) {
-             return null;
-         }
- 
 +        ComponentCommon common = null;
 +        if (component instanceof StateSpoutSpec) {
 +            common = ((StateSpoutSpec) component).get_common();
 +        } else if (component instanceof SpoutSpec) {
 +            common = ((SpoutSpec) component).get_common();
 +        } else if (component instanceof Bolt) {
 +            common = ((Bolt) component).get_common();
 +        }
 +        return common;
 +    }
 +
 +    public static void addMetricStreams(StormTopology topology) {
 +        for (Object component : allComponents(topology).values()) {
 +            ComponentCommon common = getComponentCommon(component);
-             if (common != null) {
-                 StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
-                 common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
-             }
++            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
++            common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
 +        }
 +    }
 +
 +    public static void addSystemStreams(StormTopology topology) {
 +        for (Object component : allComponents(topology).values()) {
 +            ComponentCommon common = getComponentCommon(component);
-             if (common != null) {
-                 StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
-                 common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
-             }
++            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
++            common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
 +        }
 +    }
 +
 +    public static List<String> eventLoggerBoltFields() {
 +        List<String> fields = Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
 +                EventLoggerBolt.FIELD_VALUES);
 +        return fields;
 +    }
 +
 +    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
 +        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
 +        Set<String> allIds = new HashSet<String>();
 +        allIds.addAll(topology.get_bolts().keySet());
 +        allIds.addAll(topology.get_spouts().keySet());
 +
 +        for(String id : allIds) {
 +            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
 +        }
 +        return inputs;
 +    }
 +
 +    public static void addEventLogger(Map conf, StormTopology topology) {
 +        Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
 +        HashMap<String, Object> componentConf = new HashMap<String, Object>();
 +        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
 +        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
 +        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
 +
 +        for(Object component : allComponents(topology).values()) {
 +            ComponentCommon common = getComponentCommon(component);
-             if (common != null) {
-                 common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
-             }
++            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
 +        }
 +        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
 +    }
 +
 +    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
 +        Map<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>();
 +
 +        Set<String> componentIdsEmitMetrics = new HashSet<String>();
 +        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
 +        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
 +
 +        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
 +        for (String componentId : componentIdsEmitMetrics) {
 +            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
 +        }
 +
 +        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
 +        if (registerInfo != null) {
 +            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
 +            for (Map<String, Object> info : registerInfo) {
 +                String className = (String) info.get("class");
 +                Object argument = info.get("argument");
 +                Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
 +                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
 +                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
 +                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
 +
 +                String id = className;
 +                if (classOccurrencesMap.containsKey(className)) {
 +                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
 +                    int occurrenceNum = classOccurrencesMap.get(className);
 +                    occurrenceNum++;
 +                    classOccurrencesMap.put(className, occurrenceNum);
 +                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
 +                } else {
 +                    classOccurrencesMap.put(className, 1);
 +                }
 +                metricsConsumerBolts.put(id, metricsConsumerBolt);
 +            }
 +        }
 +        return metricsConsumerBolts;
 +    }
 +
 +    public static void addMetricComponents(Map conf, StormTopology topology) {
 +        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
 +        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
 +            topology.put_to_bolts(entry.getKey(), entry.getValue());
 +        }
 +    }
 +
 +    public static void addSystemComponents(Map conf, StormTopology topology) {
 +        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
 +        outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
 +        outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
 +        outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
 +
 +        Map<String, Object> boltConf = new HashMap<String, Object>();
 +        boltConf.put(Config.TOPOLOGY_TASKS, 0);
 +
 +        Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
 +        topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec);
 +    }
 +
 +    public static StormTopology systemTopology(Map stormConf, StormTopology topology) throws InvalidTopologyException {
 +        return _instance.systemTopologyImpl(stormConf, topology);
 +    }
 +
 +    protected StormTopology systemTopologyImpl(Map stormConf, StormTopology topology) throws InvalidTopologyException {
 +        validateBasic(topology);
 +
 +        StormTopology ret = topology.deepCopy();
 +        addAcker(stormConf, ret);
 +        addEventLogger(stormConf, ret);
 +        addMetricComponents(stormConf, ret);
 +        addSystemComponents(stormConf, ret);
 +        addMetricStreams(ret);
 +        addSystemStreams(ret);
 +
 +        validateStructure(ret);
 +
 +        return ret;
 +    }
 +
 +    public static boolean hasAckers(Map stormConf) {
 +        Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
 +        if (ackerNum == null || Utils.getInt(ackerNum) > 0) {
 +            return true;
 +        } else {
 +            return false;
 +        }
 +    }
 +
 +    public static boolean hasEventLoggers(Map stormConf) {
 +        Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
 +        if (eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0) {
 +            return true;
 +        } else {
 +            return false;
 +        }
 +    }
 +
 +    public static int numStartExecutors(Object component) throws InvalidTopologyException {
 +        ComponentCommon common = getComponentCommon(component);
-         if (common == null) {
-             throw new InvalidTopologyException("unknown component type " + component.getClass().getName());
-         }
-         int parallelismHintNum = Thrift.getParallelismHint(common);
-         return parallelismHintNum;
++        return Thrift.getParallelismHint(common);
 +    }
 +
-     public static Map stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
++    public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
 +        return _instance.stormTaskInfoImpl(userTopology, stormConf);
 +    }
 +    /*
 +     * Returns map from task -> componentId
 +     */
-     protected Map stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
++    protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
 +        Map<Integer, String> taskIdToComponentId = new HashMap<Integer, String>();
 +
 +        StormTopology systemTopology = systemTopology(stormConf, userTopology);
 +        Map<String, Object> components = allComponents(systemTopology);
 +        Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
 +        for (Map.Entry<String, Object> entry : components.entrySet()) {
 +            Map conf = componentConf(entry.getValue());
 +            Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
-             if (taskNum != null) {
-                 componentIdToTaskNum.put(entry.getKey(), Utils.getInt(taskNum));
-             }
++            componentIdToTaskNum.put(entry.getKey(), Utils.getInt(taskNum));
 +        }
 +
 +        int taskId = 1;
 +        for (Map.Entry<String, Integer> entry : componentIdToTaskNum.entrySet()) {
 +            String componentId = entry.getKey();
 +            Integer taskNum = entry.getValue();
 +            while (taskNum > 0) {
 +                taskIdToComponentId.put(taskId, componentId);
 +                taskNum--;
 +                taskId++;
 +            }
 +        }
 +        return taskIdToComponentId;
 +    }
 +
 +    public static List<Integer> executorIdToTasks(List<Long> executorId) {
 +        List<Integer> taskIds = new ArrayList<Integer>();
 +        int taskId = executorId.get(0).intValue();
 +        while (taskId <= executorId.get(1).intValue()) {
 +            taskIds.add(taskId);
 +            taskId++;
 +        }
 +        return taskIds;
 +    }
 +
 +    public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodeport) {
 +        Map<Integer, NodeInfo> tasksToNodeport = new HashMap<Integer, NodeInfo>();
 +        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodeport.entrySet()) {
 +            List<Integer> taskIds = executorIdToTasks(entry.getKey());
 +            for (Integer taskId : taskIds) {
 +                tasksToNodeport.put(taskId, entry.getValue());
 +            }
 +        }
 +        return tasksToNodeport;
 +    }
 +
-     public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) {
++    public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
 +        return _instance.mkAuthorizationHandlerImpl(klassName, conf);
 +    }
 +
-     protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) {
++    protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
 +        IAuthorizer aznHandler = null;
-         try {
-             if (klassName != null) {
-                 Class aznClass = Class.forName(klassName);
-                 if (aznClass != null) {
-                     aznHandler = (IAuthorizer) aznClass.newInstance();
-                     if (aznHandler != null) {
-                         aznHandler.prepare(conf);
-                     }
-                     LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
++        if (klassName != null) {
++            Class aznClass = Class.forName(klassName);
++            if (aznClass != null) {
++                aznHandler = (IAuthorizer) aznClass.newInstance();
++                if (aznHandler != null) {
++                    aznHandler.prepare(conf);
 +                }
++                LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
 +            }
-         } catch (Exception e) {
-             LOG.error("Failed to make authorization handler, klassName:{}", klassName);
 +        }
 +
 +        return aznHandler;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj
index b63ac1f,fb000da..c32480f
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@@ -22,10 -22,10 +22,10 @@@
              TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
             [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
             [org.apache.storm.generated GlobalStreamId]
-            [org.apache.storm Thrift])
+            [org.apache.storm Thrift MockAutoCred])
    (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
    (:import [org.apache.storm.scheduler INimbus])
 -  (:import [org.mockito Mockito])
 +  (:import [org.mockito Mockito Matchers])
    (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
    (:import [org.apache.storm.testing.staticmocking MockedCluster])
@@@ -36,14 -36,14 +36,14 @@@
    (:import [java.util HashMap])
    (:import [java.io File])
    (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate]
 -           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
 +           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller CommonInstaller])
    (:import [org.apache.storm.zookeeper Zookeeper])
 -  (:import [org.apache.commons.io FileUtils]
 -           [org.json.simple JSONValue])
 +  (:import [org.apache.commons.io FileUtils])
 +  (:import [org.json.simple JSONValue])
 +  (:import [org.apache.storm.daemon StormCommon])
    (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
-   (:use [org.apache.storm testing MockAutoCred util config log converter])
+   (:use [org.apache.storm testing util config log converter])
 -  (:use [org.apache.storm.daemon common])
 -  (:require [conjure.core])
 +    (:require [conjure.core] [org.apache.storm.daemon.worker :as worker])
  
    (:use [conjure core]))
  

http://git-wip-us.apache.org/repos/asf/storm/blob/ee4fa721/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------