You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:05 UTC

[05/10] storm git commit: STORM-1281: LocalCluster, testing4j and testing.clj to java

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalCluster.java b/storm-core/src/jvm/org/apache/storm/LocalCluster.java
new file mode 100644
index 0000000..3b18ac9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/LocalCluster.java
@@ -0,0 +1,824 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
+
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.nimbus.Nimbus;
+import org.apache.storm.daemon.nimbus.Nimbus.StandaloneINimbus;
+import org.apache.storm.daemon.supervisor.ReadClusterState;
+import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.executor.LocalExecutor;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus.Processor;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.local.Context;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.security.auth.IGroupMappingServiceProvider;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.testing.InProcessZookeeper;
+import org.apache.storm.testing.NonRichBoltTracker;
+import org.apache.storm.testing.TmpPath;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.RegisteredGlobalState;
+import org.apache.storm.utils.StormCommonInstaller;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A stand alone storm cluster that runs inside a single process.
+ * It is intended to be used for testing.  Both internal testing for
+ * Apache Storm itself and for people building storm topologies.
+ * 
+ * LocalCluster is an AutoCloseable so if you are using it in tests you can use
+ * a try block to be sure it is shut down.
+ * 
+ * try (LocalCluster cluster = new LocalCluster()) {
+ *     // Do some tests
+ * }
+ * // The cluster has been shut down.
+ */
+public class LocalCluster implements ILocalCluster {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalCluster.class);
+    
+    private static ThriftServer startNimbusDaemon(Map<String, Object> conf, Nimbus nimbus) {
+        ThriftServer ret = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
+        LOG.info("Starting Nimbus server...");
+        new Thread(() -> ret.serve()).start();
+        return ret;
+    }
+    
+    /**
+     * Simple way to configure a LocalCluster to meet your needs.
+     */
+    public static class Builder {
+        private int supervisors = 2;
+        private int portsPerSupervisor = 3;
+        private Map<String, Object> daemonConf = new HashMap<>();
+        private INimbus inimbus = null;
+        private IGroupMappingServiceProvider groupMapper = null;
+        private int supervisorSlotPortMin = 1024;
+        private boolean nimbusDaemon = false;
+        private UnaryOperator<Nimbus> nimbusWrapper = null;
+        private BlobStore store = null;
+        private IStormClusterState clusterState = null;
+        private ILeaderElector leaderElector = null;
+        private String trackId = null;
+        private boolean simulateTime = false;
+        
+        /**
+         * Set the number of supervisors the cluster should have.
+         */
+        public Builder withSupervisors(int supervisors) {
+            if (supervisors < 0) {
+                throw new IllegalArgumentException("supervisors cannot be negative");
+            }
+            this.supervisors = supervisors;
+            return this;
+        }
+        
+        /**
+         * Set the number of slots/ports each supervisor should have
+         */
+        public Builder withPortsPerSupervisor(int portsPerSupervisor) {
+            if (portsPerSupervisor < 0) {
+                throw new IllegalArgumentException("supervisor ports cannot be negative");
+            }
+            this.portsPerSupervisor = portsPerSupervisor;
+            return this;
+        }
+        
+        /**
+         * Set the base config that the daemons should use.
+         */
+        public Builder withDaemonConf(Map<String, Object> conf) {
+            if (conf != null) {
+                this.daemonConf = new HashMap<>(conf);
+            }
+            return this;
+        }
+        
+        /**
+         * Add an single key/value config to the daemon conf
+         */
+        public Builder withDaemonConf(String key, Object value) {
+            this.daemonConf.put(key, value);
+            return this;
+        }
+        
+        /**
+         * Override the INimbus instance that nimbus will use.
+         */
+        public Builder withINimbus(INimbus inimbus) {
+            this.inimbus = inimbus;
+            return this;
+        }
+        
+        /**
+         * Override the code that maps users to groups for authorization.
+         */
+        public Builder withGroupMapper(IGroupMappingServiceProvider groupMapper) {
+            this.groupMapper = groupMapper;
+            return this;
+        }
+        
+        /**
+         * When assigning ports to worker slots start at minPort.
+         */
+        public Builder withSupervisorSlotPortMin(Number minPort) {
+            int port = 1024;
+            if (minPort == null) {
+                LOG.warn("Number is null... {}", minPort);
+            } else {
+                port = minPort.intValue();
+            }
+            if (port <= 0) {
+                throw new IllegalArgumentException("port must be positive");
+            }
+            this.supervisorSlotPortMin = port;
+            return this;
+        }
+        
+        /**
+         * Have the local nimbus actually launch a thrift server.  This is intended to
+         * be used mostly for internal storm testing. 
+         */
+        public Builder withNimbusDaemon() {
+            return withNimbusDaemon(true);
+        }
+
+        /**
+         * If nimbusDaemon is true the local nimbus will launch a thrift server.  This is intended to
+         * be used mostly for internal storm testing. 
+         */
+        public Builder withNimbusDaemon(Boolean nimbusDaemon) {
+            if (nimbusDaemon == null) {
+                nimbusDaemon = false;
+                LOG.warn("nimbusDaemon is null");
+            }
+            this.nimbusDaemon = nimbusDaemon;
+            return this;
+        }
+        
+        /**
+         * Turn on simulated time in the cluster.  This allows someone to simulate long periods of
+         * time for timeouts etc when testing nimbus/supervisors themselves.  NOTE: that this only
+         * works for code that uses the {@link org.apache.storm.utils.Time} class for time management
+         * so it will not work in all cases.
+         */
+        public Builder withSimulatedTime() {
+            return withSimulatedTime(true);
+        }
+
+        /**
+         * Turn on simulated time in the cluster.  This allows someone to simulate long periods of
+         * time for timeouts etc when testing nimbus/supervisors themselves.  NOTE: that this only
+         * works for code that uses the {@link org.apache.storm.utils.Time} class for time management
+         * so it will not work in all cases.
+         */
+        public Builder withSimulatedTime(boolean simulateTime) {
+            this.simulateTime = simulateTime;
+            return this;
+        }
+        
+        /**
+         * Before nimbus is created/used call nimbusWrapper on it first and use the
+         * result instead.  This is intended for internal testing only, and it here to
+         * allow a mocking framework to spy on the nimbus class.
+         */
+        public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) {
+            this.nimbusWrapper = nimbusWrapper;
+            return this;
+        }
+        
+        /**
+         * Use the following blobstore instead of the one in the config.
+         * This is intended mostly for internal testing with Mocks.
+         */
+        public Builder withBlobStore(BlobStore store) {
+            this.store = store;
+            return this;
+        }
+        
+        /**
+         * Use the following clusterState instead of the one in the config.
+         * This is intended mostly for internal testing with Mocks.
+         */
+        public Builder withClusterState(IStormClusterState clusterState) {
+            this.clusterState = clusterState;
+            return this;
+        }
+        
+        /**
+         * Use the following leaderElector instead of the one in the config.
+         * This is intended mostly for internal testing with Mocks.
+         */
+        public Builder withLeaderElector(ILeaderElector leaderElector) {
+            this.leaderElector = leaderElector;
+            return this;
+        }
+        
+        /**
+         * A tracked cluster can run tracked topologies.
+         * See {@link org.apache.storm.testing.TrackedTopology} for more information
+         * on tracked topologies.
+         * @param trackId an arbitrary unique id that is used to keep track of tracked topologies 
+         */
+        public Builder withTracked(String trackId) {
+            this.trackId = trackId;
+            return this;
+        }
+        
+        /**
+         * A tracked cluster can run tracked topologies.
+         * See {@link org.apache.storm.testing.TrackedTopology} for more information
+         * on tracked topologies.
+         */
+        public Builder withTracked() {
+            this.trackId = Utils.uuid();
+            return this;
+        }
+        
+        /**
+         * @return the LocalCluster
+         * @throws Exception on any one of many different errors.
+         * This is intended for testing so yes it is ugly and throws Exception...
+         */
+        public LocalCluster build() throws Exception {
+            return new LocalCluster(this);
+        }
+    }
+    
+    private static class TrackedStormCommon extends StormCommon {
+        private final String id;
+        public TrackedStormCommon(String id) {
+            this.id = id;
+        }
+        
+        @Override
+        public IBolt makeAckerBoltImpl() {
+            return new NonRichBoltTracker(new Acker(), id);
+        }
+    }
+    
+    private final Nimbus nimbus;
+    //This is very private and does not need to be exposed
+    private final AtomicInteger portCounter;
+    private final Map<String, Object> daemonConf;
+    private final List<Supervisor> supervisors;
+    private final IStateStorage state;
+    private final IStormClusterState clusterState;
+    private final List<TmpPath> tmpDirs;
+    private final InProcessZookeeper zookeeper;
+    private final IContext sharedContext;
+    private final ThriftServer thriftServer;
+    private final String trackId;
+    private final StormCommonInstaller commonInstaller;
+    private final SimulatedTime time;
+    
+    /**
+     * Create a default LocalCluster 
+     * @throws Exception on any error
+     */
+    public LocalCluster() throws Exception {
+        this(new Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true));
+    }
+    
+    /**
+     * Create a LocalCluster that connects to an existing Zookeeper instance
+     * @param zkHost the host for ZK
+     * @param zkPort the port for ZK
+     * @throws Exception on any error
+     */
+    public LocalCluster(String zkHost, Long zkPort) throws Exception {
+        this(new Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true)
+                .withDaemonConf(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkHost))
+                .withDaemonConf(Config.STORM_ZOOKEEPER_PORT, zkPort));
+    }
+    
+    @SuppressWarnings("deprecation")
+    private LocalCluster(Builder builder) throws Exception {
+        if (builder.simulateTime) {
+            time = new SimulatedTime();
+        } else {
+            time = null;
+        }
+        this.trackId = builder.trackId;
+        if (trackId != null) {
+            ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
+            metrics.put("spout-emitted", new AtomicInteger(0));
+            metrics.put("transferred", new AtomicInteger(0));
+            metrics.put("processed", new AtomicInteger(0));
+            this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
+            LOG.warn("Adding tracked metrics for ID {}", this.trackId);
+            RegisteredGlobalState.setState(this.trackId, metrics);
+            LocalExecutor.setTrackId(this.trackId);
+        } else {
+            this.commonInstaller = null;
+        }
+        
+        this.tmpDirs = new ArrayList<>();
+        this.supervisors = new ArrayList<>();
+        TmpPath nimbusTmp = new TmpPath();
+        this.tmpDirs.add(nimbusTmp);
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+        conf.put(Config.ZMQ_LINGER_MILLIS, 0);
+        conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+        conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
+        conf.put(Config.STORM_CLUSTER_MODE, "local");
+        conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
+        conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
+        
+        InProcessZookeeper zookeeper = null;
+        if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
+            zookeeper = new InProcessZookeeper();
+            conf.put(Config.STORM_ZOOKEEPER_PORT, zookeeper.getPort());
+            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
+        }
+        this.zookeeper = zookeeper;
+        conf.putAll(builder.daemonConf);
+        this.daemonConf = new HashMap<>(conf);
+        
+        this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
+        ClusterStateContext cs = new ClusterStateContext();
+        this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs);
+        if (builder.clusterState == null) {
+            clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs);
+        } else {
+            this.clusterState = builder.clusterState;
+        }
+        //Set it for nimbus only
+        conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
+        Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus, 
+                this.getClusterState(), null, builder.store, builder.leaderElector, builder.groupMapper);
+        if (builder.nimbusWrapper != null) {
+            nimbus = builder.nimbusWrapper.apply(nimbus);
+        }
+        this.nimbus = nimbus;
+        this.nimbus.launchServer();
+        IContext context = null;
+        if (!Utils.getBoolean(this.daemonConf.get(Config.STORM_LOCAL_MODE_ZMQ), false)) {
+            context = new Context();
+            context.prepare(this.daemonConf);
+        }
+        this.sharedContext = context;
+        this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf, this.nimbus) : null;
+        
+        for (int i = 0; i < builder.supervisors; i++) {
+            addSupervisor(builder.portsPerSupervisor, null, null);
+        }
+        
+        //Wait for a leader to be elected (or topology submission can be rejected)
+        try {
+            long timeoutAfter = System.currentTimeMillis() + 10_000;
+            while (!hasLeader()) {
+                if (timeoutAfter > System.currentTimeMillis()) {
+                    throw new IllegalStateException("Timed out waiting for nimbus to become the leader");
+                }
+                Thread.sleep(1);
+            }
+        } catch (Exception e) {
+            //Ignore any exceptions we might be doing a test for authentication 
+        }
+    }
+    
+    private boolean hasLeader() throws AuthorizationException, TException {
+        ClusterSummary summary = getNimbus().getClusterInfo();
+        if (summary.is_set_nimbuses()) {
+            for (NimbusSummary sum: summary.get_nimbuses()) {
+                if (sum.is_isLeader()) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return Nimbus itself so you can interact with it directly, if needed.
+     */
+    public Nimbus getNimbus() {
+        return nimbus;
+    }
+    
+    /**
+     * @return the base config for the daemons.
+     */
+    public Map<String, Object> getDaemonConf() {
+        return new HashMap<>(daemonConf);
+    }
+    
+    public static final KillOptions KILL_NOW = new KillOptions();
+    static {
+        KILL_NOW.set_wait_secs(0);
+    }
+    
+    /**
+     * When running a topology locally, for tests etc.  It is helpful to be sure
+     * that the topology is dead before the test exits.  This is an AutoCloseable
+     * topology that not only gives you access to the compiled StormTopology
+     * but also will kill the topology when it closes.
+     * 
+     * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) {
+     *   // Run Some test
+     * }
+     * // The topology has been killed
+     */
+    public class LocalTopology extends StormTopology implements ILocalTopology {
+        private static final long serialVersionUID = 6145919776650637748L;
+        private final String topoName;
+        
+        public LocalTopology(String topoName, StormTopology topo) {
+            super(topo);
+            this.topoName = topoName;
+        }
+
+        @Override
+        public void close() throws TException {
+            killTopologyWithOpts(topoName, KILL_NOW);
+        }
+    }
+    
+    @Override
+    public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, StormTopology topology)
+            throws TException {
+        if (!Utils.isValidConf(conf)) {
+            throw new IllegalArgumentException("Topology conf is not json-serializable");
+        }
+        getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), topology);
+        
+        ISubmitterHook hook = (ISubmitterHook) Utils.getConfiguredClass(conf, Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN);
+        if (hook != null) {
+            TopologyInfo topologyInfo = Utils.getTopologyInfo(topologyName, null, conf);
+            try {
+                hook.notify(topologyInfo, conf, topology);
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return new LocalTopology(topologyName, topology);
+    }
+
+    @Override
+    public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, StormTopology topology, SubmitOptions submitOpts)
+            throws TException {
+        if (!Utils.isValidConf(conf)) {
+            throw new IllegalArgumentException("Topology conf is not json-serializable");
+        }
+        getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), topology, submitOpts);
+        return new LocalTopology(topologyName, topology);
+    }
+
+    @Override
+    public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, TrackedTopology topology)
+            throws TException {
+        submitTopology(topologyName, conf, topology.getTopology());
+        return new LocalTopology(topologyName, topology.getTopology());
+    }
+
+    @Override
+    public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts)
+            throws TException {
+            submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts);
+            return new LocalTopology(topologyName, topology.getTopology());
+    }
+    
+    @Override
+    public void uploadNewCredentials(String topologyName, Credentials creds) throws TException {
+        getNimbus().uploadNewCredentials(topologyName, creds);
+    }
+
+    @Override
+    public void killTopology(String topologyName) throws TException {
+        getNimbus().killTopology(topologyName);
+    }
+
+    @Override
+    public void killTopologyWithOpts(String name, KillOptions options) throws TException {
+        getNimbus().killTopologyWithOpts(name, options);
+    }
+
+
+    @Override
+    public void activate(String topologyName) throws TException {
+        getNimbus().activate(topologyName);
+    }
+
+
+    @Override
+    public void deactivate(String topologyName) throws TException {
+        getNimbus().deactivate(topologyName);
+    }
+
+
+    @Override
+    public void rebalance(String name, RebalanceOptions options) throws TException {
+        getNimbus().rebalance(name, options);
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    @Override
+    public String getTopologyConf(String id) throws TException {
+        return getNimbus().getTopologyConf(id);
+    }
+
+
+    @Override
+    public StormTopology getTopology(String id) throws TException {
+        return getNimbus().getTopology(id);
+    }
+
+
+    @Override
+    public ClusterSummary getClusterInfo() throws TException {
+        return getNimbus().getClusterInfo();
+    }
+
+
+    @Override
+    public TopologyInfo getTopologyInfo(String id) throws TException {
+        return getNimbus().getTopologyInfo(id);
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        nimbus.shutdown();
+        if (thriftServer != null) {
+            LOG.info("shutting down thrift server");
+            try {
+                thriftServer.stop();
+            } catch (Exception e) {
+                LOG.info("failed to stop thrift", e);
+            }
+        }
+        if (state != null) {
+            state.close();
+        }
+        if (getClusterState() != null) {
+            getClusterState().disconnect();
+        }
+        for (Supervisor s: supervisors) {
+            s.shutdownAllWorkers(null, ReadClusterState.THREAD_DUMP_ON_ERROR);
+            s.close();
+        }
+        ProcessSimulator.killAllProcesses();
+        if (zookeeper != null) {
+            LOG.info("Shutting down in process zookeeper");
+            zookeeper.close();
+            LOG.info("Done shutting down in process zookeeper");
+        }
+        
+        for (TmpPath p: tmpDirs) {
+            p.close();
+        }
+        
+        if (this.trackId != null) {
+            LOG.warn("Clearing tracked metrics for ID {}", this.trackId);
+            LocalExecutor.clearTrackId();
+            RegisteredGlobalState.clearState(this.trackId);
+        }
+        
+        if (this.commonInstaller != null) {
+            this.commonInstaller.close();
+        }
+        
+        if (time != null) {
+            time.close();
+        }
+    }
+    
+    /**
+     * Get a specific Supervisor.  This is intended mostly for internal testing.
+     * @param id the id of the supervisor
+     */
+    public synchronized Supervisor getSupervisor(String id) {
+        for (Supervisor s: supervisors) {
+            if (id.equals(s.getId())) {
+                return s;
+            }
+        }
+        return null;
+    }
+    
+    /**
+     * Kill a specific supervisor.  This is intended mostly for internal testing.
+     * @param id the id of the supervisor
+     */
+    public synchronized void killSupervisor(String id) {
+        for (Iterator<Supervisor> it = supervisors.iterator(); it.hasNext();) {
+            Supervisor s = it.next();
+            if (id.equals(s.getId())) {
+                it.remove();
+                s.close();
+                //tmpDir will be handled separately
+                return;
+            }
+        }
+    }
+    
+    /**
+     * Add another supervisor to the topology.  This is intended mostly for internal testing.
+     */
+    public Supervisor addSupervisor() throws Exception {
+        return addSupervisor(null, null, null);
+    }
+
+    /**
+     * Add another supervisor to the topology.  This is intended mostly for internal testing.
+     * @param ports the number of ports/slots the supervisor should have
+     */
+    public Supervisor addSupervisor(Number ports) throws Exception {
+        return addSupervisor(ports, null, null);
+    }
+    
+    /**
+     * Add another supervisor to the topology.  This is intended mostly for internal testing.
+     * @param ports the number of ports/slots the supervisor should have
+     * @param id the id of the new supervisor, so you can find it later.
+     */
+    public Supervisor addSupervisor(Number ports, String id) throws Exception {
+        return addSupervisor(ports, null, id);
+    }
+    
+    /**
+     * Add another supervisor to the topology.  This is intended mostly for internal testing.
+     * @param ports the number of ports/slots the supervisor should have
+     * @param conf any config values that should be added/over written in the daemon conf of the cluster.
+     * @param id the id of the new supervisor, so you can find it later.
+     */
+    public synchronized Supervisor addSupervisor(Number ports, Map<String, Object> conf, String id) throws Exception {
+        if (ports == null) {
+            ports = 2;
+        }
+        TmpPath tmpDir = new TmpPath();
+        tmpDirs.add(tmpDir);
+        
+        List<Integer> portNumbers = new ArrayList<>(ports.intValue());
+        for (int i = 0; i < ports.intValue(); i++) {
+            portNumbers.add(portCounter.getAndIncrement());
+        }
+        
+        Map<String, Object> superConf = new HashMap<>(daemonConf);
+        if (conf != null) {
+            superConf.putAll(conf);
+        }
+        superConf.put(Config.STORM_LOCAL_DIR, tmpDir.getPath());
+        superConf.put(Config.SUPERVISOR_SLOTS_PORTS, portNumbers);
+        
+        final String superId = id == null ? Utils.uuid() : id;
+        ISupervisor isuper = new StandaloneSupervisor() {
+            @Override
+            public String generateSupervisorId() {
+                return superId;
+            }
+        };
+        if (!ConfigUtils.isLocalMode(superConf)) {
+            throw new IllegalArgumentException("Cannot start server in distrubuted mode!");
+        }
+        
+        Supervisor s = new Supervisor(superConf, sharedContext, isuper);
+        s.launch();
+        supervisors.add(s);
+        return s;
+    }
+    
+    private boolean areAllSupervisorsWaiting() {
+        boolean ret = true;
+        for (Supervisor s: supervisors) {
+            ret = ret && s.isWaiting();
+        }
+        return ret;
+    }
+    
+    private static boolean areAllWorkersWaiting() {
+        boolean ret = true;
+        for (Shutdownable s: ProcessSimulator.getAllProcessHandles()) {
+            if (s instanceof DaemonCommon) {
+                ret = ret && ((DaemonCommon)s).isWaiting();
+            }
+        }
+        return ret;
+    }
+    
+    /**
+     * Wait for the cluster to be idle.  This is intended to be used with
+     * Simulated time and is for internal testing.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws AssertionError if the cluster did not come to an idle point with
+     * a timeout.
+     */
+    public void waitForIdle() throws InterruptedException {
+        waitForIdle(Testing.TEST_TIMEOUT_MS);
+    }
+    
+    /**
+     * Wait for the cluster to be idle.  This is intended to be used with
+     * Simulated time and is for internal testing.
+     * @param timeoutMs the number of ms to wait before throwing an error.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws AssertionError if the cluster did not come to an idle point with
+     * a timeout.
+     */
+    public void waitForIdle(long timeoutMs) throws InterruptedException {
+        Random rand = ThreadLocalRandom.current();
+        //wait until all workers, supervisors, and nimbus is waiting
+        final long endTime = System.currentTimeMillis() + timeoutMs;
+        while (!(nimbus.isWaiting() &&
+                areAllSupervisorsWaiting() &&
+                areAllWorkersWaiting())) {
+            if (System.currentTimeMillis() >= endTime) {
+                LOG.info("Cluster was not idle in {} ms", timeoutMs);
+                LOG.info(Utils.threadDump());
+                throw new AssertionError("Test timed out (" + timeoutMs + "ms) cluster not idle");
+            }
+            Thread.sleep(rand.nextInt(20));
+        }
+    }
+    
+    @Override
+    public void advanceClusterTime(int secs) throws InterruptedException {
+        advanceClusterTime(secs, 1);
+    }
+    
+    @Override
+    public void advanceClusterTime(int secs, int incSecs) throws InterruptedException {
+        for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) {
+            int diff = Math.min(incSecs, amountLeft);
+            Time.advanceTimeSecs(diff);
+            waitForIdle();
+        }
+    }
+
+    @Override
+    public IStormClusterState getClusterState() {
+        return clusterState;
+    }
+    
+    @Override
+    public String getTrackedId() {
+        return trackId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
index ccdf634..e3f34d1 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -17,24 +17,30 @@
  */
 package org.apache.storm;
 
+import java.util.Map;
+
 import org.apache.storm.daemon.DrpcServer;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.generated.DRPCRequest;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServiceRegistry;
-import org.apache.storm.utils.Utils;
 import org.apache.thrift.TException;
 
-import java.util.Map;
-
+/**
+ * A Local way to test DRPC
+ * 
+ * try (LocalDRPC drpc = new LocalDRPC()) {
+ *   // Do tests
+ * }
+ */
 public class LocalDRPC implements ILocalDRPC {
 
     private final DrpcServer handler;
     private final String serviceId;
 
     public LocalDRPC() {
-        Map conf = ConfigUtils.readStormConfig();
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
         handler = new DrpcServer(conf);
         serviceId = ServiceRegistry.registerService(handler);
     }
@@ -61,12 +67,17 @@ public class LocalDRPC implements ILocalDRPC {
 
     @Override
     public void shutdown() {
-        ServiceRegistry.unregisterService(this.serviceId);
-        this.handler.close();
+        close();
     }
 
     @Override
     public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
         return handler.fetchRequest(functionName);
     }
+
+    @Override
+    public void close() {
+        ServiceRegistry.unregisterService(this.serviceId);
+        this.handler.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Testing.java b/storm-core/src/jvm/org/apache/storm/Testing.java
new file mode 100644
index 0000000..be90ae8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/Testing.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.CompletableSpout;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.FixedTupleSpout;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestJob;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.testing.TupleCaptureBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.RegisteredGlobalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility that helps with testing topologies, Bolts and Spouts.
+ */
+public class Testing {
+    private static final Logger LOG = LoggerFactory.getLogger(Testing.class);
+    /**
+     * The default amount of wall time should be spent waiting for 
+     * specific conditions to happen.  Default is 10 seconds unless
+     * the environment variable STORM_TEST_TIMEOUT_MS is set.
+     */
+    public static final int TEST_TIMEOUT_MS;
+    static {
+        int timeout = 10_000;
+        try {
+            timeout = Integer.parseInt(System.getenv("STORM_TEST_TIMEOUT_MS"));
+        } catch (Exception e) {
+            //Ignored, will go with default timeout
+        }
+        TEST_TIMEOUT_MS = timeout;
+    }
+    
+    /**
+     * Simply produces a boolean to see if a specific state is true or false.
+     */
+    public static interface Condition {
+        public boolean exec();
+    }
+    
+    /**
+     * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
+     * passed
+     * @param condition what we are waiting for
+     * @param body what to run in the loop
+     * @throws AssertionError if teh loop timed out.
+     */
+    public static void whileTimeout(Condition condition, Runnable body) {
+        whileTimeout(TEST_TIMEOUT_MS, condition, body);
+    }
+    
+    /**
+     * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
+     * passed
+     * @param the number of ms to wait before timing out.
+     * @param condition what we are waiting for
+     * @param body what to run in the loop
+     * @throws AssertionError if teh loop timed out.
+     */
+    public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
+        long endTime = System.currentTimeMillis() + timeoutMs;
+        LOG.debug("Looping until {}", condition);
+        while (condition.exec()) {
+            if (System.currentTimeMillis() > endTime) {
+                LOG.info("Condition {} not met in {} ms", condition, timeoutMs);
+                LOG.info(Utils.threadDump());
+                throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition);
+            }
+            body.run();
+        }
+        LOG.debug("Condition met {}", condition);
+    }
+    
+    /**
+     * Convenience method for data.stream.allMatch(pred)
+     */
+    public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred) {
+        return data.stream().allMatch(pred);
+    }
+    
+    /**
+     * Run with simulated time
+     * @deprecated use ``` 
+     * try (Time.SimulatedTime time = new Time.SimulatedTime()) {
+     *  ...
+     * }
+     * ```
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withSimulatedTime(Runnable code) {
+        try (SimulatedTime st = new SimulatedTime()) {
+            code.run();
+        }
+    }
+    
+    private static LocalCluster cluster(MkClusterParam param, boolean simulated) throws Exception {
+        return cluster(param, null, simulated);
+    }
+    
+    private static LocalCluster cluster(MkClusterParam param) throws Exception {
+        return cluster(param, null, false);
+    }
+    
+    private static LocalCluster cluster(MkClusterParam param, String id, boolean simulated) throws Exception {
+        Integer supervisors = param.getSupervisors();
+        if (supervisors == null) {
+            supervisors = 2;
+        }
+        Integer ports = param.getPortsPerSupervisor();
+        if (ports == null) {
+            ports = 3;
+        }
+        Map<String, Object> conf = param.getDaemonConf();
+        if (conf == null) {
+            conf = new HashMap<>();
+        }
+        return new LocalCluster.Builder()
+                .withSupervisors(supervisors)
+                .withPortsPerSupervisor(ports)
+                .withDaemonConf(conf)
+                .withNimbusDaemon(param.isNimbusDaemon())
+                .withTracked(id)
+                .withSimulatedTime(simulated)
+                .build();
+    }
+    
+    /**
+     * Run with a local cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster()) {
+     *  ...
+     * }
+     * ```
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withLocalCluster(TestJob code) {
+        withLocalCluster(new MkClusterParam(), code);
+    }
+    
+    /**
+     * Run with a local cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
+     *  ...
+     * }
+     * ```
+     * @param param configs to set in the cluster
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withLocalCluster(MkClusterParam param, TestJob code) {
+        try (LocalCluster lc = cluster(param)) {
+            code.run(lc);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Run with a local cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
+     *  ...
+     * }
+     * ```
+     * @param clusterConf some configs to set in the cluster
+     */
+    @Deprecated
+    public static ILocalCluster getLocalCluster(Map<String, Object> clusterConf) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> conf = (Map<String, Object>) clusterConf.get("daemon-conf");
+        if (conf == null) {
+            conf = new HashMap<>();
+        }
+        Number supervisors = (Number) clusterConf.getOrDefault("supervisors", 2);
+        Number ports = (Number) clusterConf.getOrDefault("ports-per-supervisor", 3);
+        INimbus inimbus = (INimbus) clusterConf.get("inimbus");
+        Number portMin = (Number) clusterConf.getOrDefault("supervisor-slot-port-min", 1024);
+        Boolean nimbusDaemon = (Boolean) clusterConf.getOrDefault("nimbus-daemon", false);
+        try {
+            return new LocalCluster.Builder()
+                    .withSupervisors(supervisors.intValue())
+                    .withDaemonConf(conf)
+                    .withPortsPerSupervisor(ports.intValue())
+                    .withINimbus(inimbus)
+                    .withSupervisorSlotPortMin(portMin)
+                    .withNimbusDaemon(nimbusDaemon)
+                    .build();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Run with a local cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) {
+     *  ...
+     * }
+     * ```
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withSimulatedTimeLocalCluster(TestJob code) {
+        withSimulatedTimeLocalCluster(new MkClusterParam(), code);
+    }
+    
+    /**
+     * Run with a local cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) {
+     *  ...
+     * }
+     * ```
+     * @param param configs to set in the cluster
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code) {
+        try (LocalCluster lc = cluster(param, true)) {
+            code.run(lc);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Run with a local cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) {
+     *  ...
+     * }
+     * ```
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withTrackedCluster(TestJob code) {
+        withTrackedCluster(new MkClusterParam(), code);
+    }
+    
+    /**
+     * In a tracked topology some metrics are tracked.  This provides a way to get those metrics.
+     * This is intended mostly for internal testing.
+     * @param id the id of the tracked cluster
+     * @param key the name of the metric to get.
+     * @return the metric
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public static int globalAmt(String id, String key) {
+        LOG.warn("Reading tracked metrics for ID {}", id);
+        return ((ConcurrentHashMap<String, AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get();
+    }
+    
+    /**
+     * Run with a local tracked cluster
+     * @deprecated use ``` 
+     * try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) {
+     *  ...
+     * }
+     * ```
+     * @param param configs to set in the cluster
+     * @param code what to run
+     */
+    @Deprecated
+    public static void withTrackedCluster(MkClusterParam param, TestJob code) {
+        try (LocalCluster lc = cluster(param, Utils.uuid(), true)) {
+            code.run(lc);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * A topology that has all messages captured and can be read later on.
+     * This is intended mostly for internal testing.
+     * @param <T> the topology (tracked or regular)
+     */
+    public static final class CapturedTopology<T> {
+        public final T topology;
+        /**
+         * a Bolt that will hold all of the captured data.
+         */
+        public final TupleCaptureBolt capturer;
+        
+        public CapturedTopology(T topology, TupleCaptureBolt capturer) {
+            this.topology = topology;
+            this.capturer = capturer;
+        }
+    }
+    
+    /**
+     * Track and capture a topology.
+     * This is intended mostly for internal testing.
+     */
+    public static CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology) {
+        CapturedTopology<StormTopology> captured = captureTopology(topology);
+        return new CapturedTopology<>(new TrackedTopology(captured.topology, cluster), captured.capturer);
+    }
+    
+    /**
+     * Rewrites a topology so that all the tuples flowing through it are captured
+     * @param topology the topology to rewrite
+     * @return the modified topology and a new Bolt that can retrieve the
+     * captured tuples.
+     */
+    public static CapturedTopology<StormTopology> captureTopology(StormTopology topology) {
+        topology = topology.deepCopy(); //Don't modify the original
+        
+        TupleCaptureBolt capturer = new TupleCaptureBolt();
+        Map<GlobalStreamId, Grouping> captureBoltInputs = new HashMap<>();
+        for (Map.Entry<String, SpoutSpec> spoutEntry : topology.get_spouts().entrySet()) {
+            String id = spoutEntry.getKey();
+            for (Entry<String, StreamInfo> streamEntry : spoutEntry.getValue().get_common().get_streams().entrySet()) {
+                String stream = streamEntry.getKey();
+                StreamInfo info = streamEntry.getValue();
+                if (info.is_direct()) {
+                    captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
+                } else {
+                    captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
+                }
+            }
+        }
+        
+        for (Entry<String, Bolt> boltEntry : topology.get_bolts().entrySet()) {
+            String id = boltEntry.getKey();
+            for (Entry<String, StreamInfo> streamEntry : boltEntry.getValue().get_common().get_streams().entrySet()) {
+                String stream = streamEntry.getKey();
+                StreamInfo info = streamEntry.getValue();
+                if (info.is_direct()) {
+                    captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
+                } else {
+                    captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
+                }
+            }
+        }
+        topology.put_to_bolts(Utils.uuid(), new Bolt(Thrift.serializeComponentObject(capturer),
+                Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null)));
+        return new CapturedTopology<>(topology, capturer);
+    }
+    
+    /**
+     * Run a topology to completion capturing all of the messages that are emitted.  This only works when all of the spouts are
+     * instances of {@link org.apache.storm.testing.CompletableSpout}
+     * @param cluster the cluster to submit the topology to
+     * @param topology the topology itself
+     * @return a map of the component to the list of tuples it emitted. 
+     * @throws InterruptedException
+     * @throws TException on any error from nimbus.
+     */
+    public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, TException {
+        return completeTopology(cluster, topology, new CompleteTopologyParam());
+    }
+    
+    /**
+     * Run a topology to completion capturing all of the messages that are emitted.  This only works when all of the spouts are
+     * instances of {@link org.apache.storm.testing.CompletableSpout} or are overwritten by MockedSources in param
+     * @param cluster the cluster to submit the topology to
+     * @param topology the topology itself
+     * @param param parameters to describe how to complete a topology.
+     * @return a map of the component to the list of tuples it emitted. 
+     * @throws InterruptedException
+     * @throws TException on any error from nimbus.
+     */
+    public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology,
+            CompleteTopologyParam param) throws TException, InterruptedException {
+        Map<String, List<FixedTuple>> ret = null;
+        IStormClusterState state = cluster.getClusterState();
+        CapturedTopology<StormTopology> capTopo = captureTopology(topology);
+        topology = capTopo.topology;
+        String topoName = param.getTopologyName();
+        if (topoName == null) {
+            topoName = "topologytest-" + Utils.uuid();
+        }
+
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        MockedSources ms = param.getMockedSources();
+        if (ms != null) {
+            for (Entry<String, List<FixedTuple>> mocked: ms.getData().entrySet()) {
+                FixedTupleSpout newSpout = new FixedTupleSpout(mocked.getValue());
+                spouts.get(mocked.getKey()).set_spout_object(Thrift.serializeComponentObject(newSpout));
+            }
+        }
+        List<Object> spoutObjects = spouts.values().stream().
+                map((spec) -> Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList());
+        
+        for (Object o: spoutObjects) {
+            if (!(o instanceof CompletableSpout)) {
+                throw new RuntimeException("Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " + o);
+            }
+        }
+        
+        for (Object spout: spoutObjects) {
+            ((CompletableSpout)spout).startup();
+        }
+        
+        cluster.submitTopology(topoName, param.getStormConf(), topology);
+        
+        if (Time.isSimulating()) {
+            cluster.advanceClusterTime(11);
+        }
+        
+        String topoId = state.getTopoId(topoName).get();
+        //Give the topology time to come up without using it to wait for the spouts to complete
+        simulateWait(cluster);
+        Integer timeoutMs = param.getTimeoutMs();
+        if (timeoutMs == null) {
+            timeoutMs = TEST_TIMEOUT_MS;
+        }
+        whileTimeout(timeoutMs,
+                () -> !isEvery(spoutObjects, (o) -> ((CompletableSpout)o).isExhausted()),
+                () -> {
+                    try {
+                        simulateWait(cluster);
+                    } catch (Exception e) {
+                        throw new RuntimeException();
+                    }
+                });
+
+        KillOptions killOpts = new KillOptions();
+        killOpts.set_wait_secs(0);
+        cluster.killTopologyWithOpts(topoName, killOpts);
+        
+        whileTimeout(timeoutMs,
+                () -> state.assignmentInfo(topoId, null) != null,
+                () -> {
+                    try {
+                        simulateWait(cluster);
+                    } catch (Exception e) {
+                        throw new RuntimeException();
+                    }
+                });
+        
+        if (param.getCleanupState()) {
+            for (Object o : spoutObjects) {
+                ((CompletableSpout)o).clean();
+            }
+            ret = capTopo.capturer.getAndRemoveResults();
+        } else {
+            ret = capTopo.capturer.getAndClearResults();
+        }
+        
+        return ret;
+    }
+    
+    /**
+     * If using simulated time simulate waiting for 10 seconds.  This is intended for internal testing only.
+     */
+    public static void simulateWait(ILocalCluster cluster) throws InterruptedException {
+        if (Time.isSimulating()) {
+            cluster.advanceClusterTime(10);
+            Thread.sleep(100);
+        }
+    }
+    
+    /**
+     * Get all of the tuples from a given component on the default stream
+     * @param results the results of running a completed topology
+     * @param componentId the id of the component to look at
+     * @return a list of the tuple values.
+     */
+    public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId) {
+        return readTuples(results, componentId, Utils.DEFAULT_STREAM_ID);
+    }
+    
+    /**
+     * Get all of the tuples from a given component on a given stream
+     * @param results the results of running a completed topology
+     * @param componentId the id of the component to look at
+     * @param streamId the id of the stream to look for.
+     * @return a list of the tuple values.
+     */
+    public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId, String streamId) {
+        List<List<Object>> ret = new ArrayList<>();
+        List<FixedTuple> streamResult = results.get(componentId);
+        if (streamResult != null) {
+            for (FixedTuple tuple: streamResult) {
+                if (streamId.equals(tuple.stream)) {
+                    ret.add(tuple.values);
+                }
+            }
+        }
+        return ret;
+    }
+    
+    /**
+     * Create a tracked topology.
+     * @deprecated use {@link org.apache.storm.testing.TrackedTopology} directly.
+     */
+    @Deprecated
+    public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology) {
+        return new TrackedTopology(topology, cluster);
+    }
+    
+    /**
+     * Simulated time wait for a tracked topology.  This is intended for internal testing
+     */
+    public static void trackedWait(CapturedTopology<TrackedTopology> topo) {
+        topo.topology.trackedWait();
+    }
+    
+    /**
+     * Simulated time wait for a tracked topology.  This is intended for internal testing
+     */
+    public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt) {
+        topo.topology.trackedWait(amt);
+    }
+    
+    /**
+     * Simulated time wait for a tracked topology.  This is intended for internal testing
+     */
+    public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) {
+        topo.topology.trackedWait(amt, timeoutMs);
+    }
+    
+    /**
+     * Simulated time wait for a tracked topology.  This is intended for internal testing
+     */
+    public static void trackedWait(TrackedTopology topo) {
+        topo.trackedWait();
+    }
+    
+    /**
+     * Simulated time wait for a tracked topology.  This is intended for internal testing
+     */
+    public static void trackedWait(TrackedTopology topo, Integer amt) {
+        topo.trackedWait(amt);
+    }
+    
+    /**
+     * Simulated time wait for a tracked topology.  This is intended for internal testing
+     */
+    public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs) {
+        topo.trackedWait(amt, timeoutMs);
+    }
+    
+    /**
+     * Simulated time wait for a cluster.  This is intended for internal testing
+     */
+    public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException {
+        advanceClusterTime(cluster, secs, 1);
+    }
+    
+    /**
+     * Simulated time wait for a cluster.  This is intended for internal testing
+     */
+    public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException {
+        cluster.advanceClusterTime(secs, step);
+    }
+    
+    /**
+     * Count how many times each element appears in the Collection
+     * @param c a collection of values
+     * @return a map of the unique values in c to the count of those values.
+     */
+    public static <T> Map<T, Integer> multiset(Collection<T> c) {
+        Map<T, Integer> ret = new HashMap<T, Integer>();
+        for (T t: c) {
+            Integer i = ret.get(t);
+            if (i == null) {
+                i = new Integer(0);
+            }
+            i += 1;
+            ret.put(t, i);
+        }
+        return ret;
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private static void printRec(Object o, String prefix) {
+        if (o instanceof Collection) {
+            LOG.info("{} {} ({}) [",prefix,o, o.getClass());
+            for (Object sub: (Collection)o) {
+                printRec(sub, prefix + "  ");
+            }
+            LOG.info("{} ]",prefix);
+        } else if (o instanceof Map) {
+            Map<?,?> m = (Map<?,?>)o;
+            LOG.info("{} {} ({}) {",prefix,o, o.getClass());
+            for (Map.Entry<?, ?> entry: m.entrySet()) {
+                printRec(entry.getKey(), prefix + "  ");
+                LOG.info("{} ->", prefix);
+                printRec(entry.getValue(), prefix + "  ");
+            }
+            LOG.info("{} }",prefix);
+        } else {
+            LOG.info("{} {} ({})", prefix, o, o.getClass());
+        }
+    }
+    
+    /**
+     * Check if two collections are equivalent ignoring the order of elements.
+     */
+    public static <T> boolean multiseteq(Collection<T> a, Collection<T> b) {
+        boolean ret = multiset(a).equals(multiset(b));
+        if (! ret) {
+            printRec(multiset(a), "MS-A:");
+            printRec(multiset(b), "MS-B:");
+        }
+        return ret;
+    }
+    
+    /**
+     * Create a {@link org.apache.storm.tuple.Tuple} for use with testing
+     * @param values the values to appear in the tuple
+     */
+    public static Tuple testTuple(List<Object> values) {
+        return testTuple(values, new MkTupleParam());
+    }
+    
+    /**
+     * Create a {@link org.apache.storm.tuple.Tuple} for use with testing
+     * @param values the values to appear in the tuple
+     * @param param parametrs describing more details about the tuple
+     */
+    public static Tuple testTuple(List<Object> values, MkTupleParam param) {
+        String stream = param.getStream();
+        if (stream == null) {
+            stream = Utils.DEFAULT_STREAM_ID;
+        }
+        
+        String component = param.getComponent();
+        if (component == null) {
+            component = "component";
+        }
+        
+        int task = 1;
+        
+        List<String> fields = param.getFields();
+        if (fields == null) {
+            fields = new ArrayList<>(values.size());
+            for (int i = 1; i <= values.size(); i++) {
+                fields.add("field"+i);
+            }
+        }
+
+        Map<Integer, String> taskToComp = new HashMap<>();
+        taskToComp.put(task, component);
+        Map<String, Map<String, Fields>> compToStreamToFields = new HashMap<>();
+        Map<String, Fields> streamToFields = new HashMap<>();
+        streamToFields.put(stream, new Fields(fields));
+        compToStreamToFields.put(component, streamToFields);
+        
+        TopologyContext context= new TopologyContext(null,
+                ConfigUtils.readStormConfig(),
+                taskToComp,
+                null,
+                compToStreamToFields,
+                "test-storm-id",
+                null,
+                null,
+                1,
+                null,
+                null,
+                new HashMap<>(),
+                new HashMap<>(),
+                new HashMap<>(),
+                new HashMap<>(),
+                new AtomicBoolean(false));
+        return new TupleImpl(context, values, 1, stream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Thrift.java b/storm-core/src/jvm/org/apache/storm/Thrift.java
index cde822f..9ed8bb2 100644
--- a/storm-core/src/jvm/org/apache/storm/Thrift.java
+++ b/storm-core/src/jvm/org/apache/storm/Thrift.java
@@ -278,7 +278,7 @@ public class Thrift {
 
     public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
         return new SpoutSpec(ComponentObject.serialized_java
-                (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap(), outputs, null, null));
+                (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap<>(), outputs, null, null));
     }
 
     public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index c0b78d3..c550cfe 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -30,6 +30,7 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,8 @@ public class BlobStoreUtils {
     private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
     private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
 
-    public static CuratorFramework createZKClient(Map conf) {
+    public static CuratorFramework createZKClient(Map<String, Object> conf) {
+        @SuppressWarnings("unchecked")
         List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
@@ -106,7 +108,7 @@ public class BlobStoreUtils {
     }
 
     // Download missing blobs from potential nimbodes
-    public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
+    public static boolean downloadMissingBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
             throws TTransportException {
         NimbusClient client;
         ReadableBlobMeta rbm;
@@ -156,7 +158,7 @@ public class BlobStoreUtils {
     }
 
     // Download updated blobs from potential nimbodes
-    public static boolean downloadUpdatedBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
+    public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
             throws TTransportException {
         NimbusClient client;
         ClientBlobStore remoteBlobStore;
@@ -215,17 +217,17 @@ public class BlobStoreUtils {
         return keyList;
     }
 
-    public static void createStateInZookeeper(Map conf, String key, NimbusInfo nimbusInfo) throws TTransportException {
+    public static void createStateInZookeeper(Map<String, Object> conf, String key, NimbusInfo nimbusInfo) throws TTransportException {
         ClientBlobStore cb = new NimbusBlobStore();
         cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null));
         cb.createStateInZookeeper(key);
     }
 
-    public static void updateKeyForBlobStore (Map conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) {
+    public static void updateKeyForBlobStore (Map<String, Object> conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) {
         try {
             // Most of clojure tests currently try to access the blobs using getBlob. Since, updateKeyForBlobStore
             // checks for updating the correct version of the blob as a part of nimbus ha before performing any
-            // operation on it, there is a neccessity to stub several test cases to ignore this method. It is a valid
+            // operation on it, there is a necessity to stub several test cases to ignore this method. It is a valid
             // trade off to return if nimbusDetails which include the details of the current nimbus host port data are
             // not initialized as a part of the test. Moreover, this applies to only local blobstore when used along with
             // nimbus ha.
@@ -238,6 +240,7 @@ public class BlobStoreUtils {
                 return;
             }
             stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+
             LOG.debug("StateInfo for update {}", stateInfo);
             Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
 
@@ -252,6 +255,9 @@ public class BlobStoreUtils {
                 LOG.debug("Updating state inside zookeeper for an update");
                 createStateInZookeeper(conf, key, nimbusDetails);
             }
+        } catch (NoNodeException e) {
+            //race condition with a delete
+            return;
         } catch (Exception exp) {
             throw new RuntimeException(exp);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index c2c62bd..8df981b 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -26,6 +26,7 @@ import org.apache.storm.generated.ReadableBlobMeta;
 
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -270,7 +271,12 @@ public class LocalFsBlobStore extends BlobStore {
         if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
             return 0;
         }
-        replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
+        try {
+            replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
+        } catch (NoNodeException e) {
+            //Race with delete
+            //If it is not here the replication is 0 
+        }
         return replicationCount;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 443e471..704c9e5 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -53,6 +53,12 @@ public interface IStormClusterState {
 
     public List<String> activeStorms();
 
+    /**
+     * Get a storm base for a topology
+     * @param stormId the id of the topology
+     * @param callback something to call if the data changes (best effort)
+     * @return the StormBase or null if it is not alive.
+     */
     public StormBase stormBase(String stormId, Runnable callback);
 
     public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
@@ -183,8 +189,10 @@ public interface IStormClusterState {
         Map<String, StormBase> stormBases = new HashMap<>();
         for (String topologyId : activeStorms()) {
             StormBase base = stormBase(topologyId, null);
-            stormBases.put(topologyId, base);
+            if (base != null) { //rece condition with delete
+                stormBases.put(topologyId, base);
+            }
         }
         return stormBases;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
index 63d39f8..65c7416 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -749,9 +749,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     
     private static String extractStatusStr(StormBase base) {
         String ret = null;
-        TopologyStatus status = base.get_status();
-        if (status != null) {
-            ret = status.name().toUpperCase();
+        if (base != null) {
+            TopologyStatus status = base.get_status();
+            if (status != null) {
+                ret = status.name().toUpperCase();
+            }
         }
         return ret;
     }
@@ -1308,18 +1310,15 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 minReplicationCount, confCount, codeCount, jarCount);
     }
     
-    private TopologyDetails readTopologyDetails(String topoId) throws NotAliveException, KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
-        StormBase base = stormClusterState.stormBase(topoId, null);
-        if (base == null) {
-            if (topoId == null) {
-                throw new NullPointerException();
-            }
-            throw new NotAliveException(topoId);
-        }
+    private TopologyDetails readTopologyDetails(String topoId, StormBase base) throws KeyNotFoundException,
+      AuthorizationException, IOException, InvalidTopologyException {
+        assert (base != null);
+        assert (topoId != null);
+        
         BlobStore store = blobStore;
         Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
         StormTopology topo = readStormTopologyAsNimbus(topoId, store);
-        Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId);
+        Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId, base);
         Map<ExecutorDetails, String> executorsToComponent = new HashMap<>();
         for (Entry<List<Integer>, String> entry: rawExecToComponent.entrySet()) {
             List<Integer> execs = entry.getKey();
@@ -1380,9 +1379,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
     
-    private List<List<Integer>> computeExecutors(String topoId) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
+    private List<List<Integer>> computeExecutors(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
         BlobStore store = blobStore;
-        StormBase base = stormClusterState.stormBase(topoId, null);
+        assert (base != null);
+
         Map<String, Integer> compToExecutors = base.get_component_executors();
         Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
         StormTopology topology = readStormTopologyAsNimbus(topoId, store);
@@ -1405,9 +1405,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
     
-    private Map<List<Integer>, String> computeExecutorToComponent(String topoId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
+    private Map<List<Integer>, String> computeExecutorToComponent(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
         BlobStore store = blobStore;
-        List<List<Integer>> executors = computeExecutors(topoId);
+        List<List<Integer>> executors = computeExecutors(topoId, base);
         StormTopology topology = readStormTopologyAsNimbus(topoId, store);
         Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
         Map<Integer, String> taskToComponent = StormCommon.stormTaskInfo(topology, topoConf);
@@ -1418,11 +1418,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
     
-    private Map<String, Set<List<Integer>>> computeTopologyToExecutors(Collection<String> topoIds) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
+    private Map<String, Set<List<Integer>>> computeTopologyToExecutors(Map<String, StormBase> bases) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
         Map<String, Set<List<Integer>>> ret = new HashMap<>();
-        if (topoIds != null) {
-            for (String topoId: topoIds) {
-                ret.put(topoId, new HashSet<>(computeExecutors(topoId)));
+        if (bases != null) {
+            for (Entry<String, StormBase> entry: bases.entrySet()) {
+                String topoId = entry.getKey();
+                ret.put(topoId, new HashSet<>(computeExecutors(topoId, entry.getValue())));
             }
         }
         return ret;
@@ -1579,8 +1580,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments,
-            Topologies topologies, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
-        Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(existingAssignments.keySet());
+            Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
+        Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases);
         
         updateAllHeartbeats(existingAssignments, topoToExec);
 
@@ -1641,12 +1642,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return cluster.getAssignments();
     }
     
-    private TopologyResources getResourcesForTopology(String topoId) throws NotAliveException, AuthorizationException, InvalidTopologyException, IOException {
+    private TopologyResources getResourcesForTopology(String topoId, StormBase base) throws NotAliveException, AuthorizationException, InvalidTopologyException, IOException {
         TopologyResources ret = idToResources.get().get(topoId);
         if (ret == null) {
             try {
                 IStormClusterState state = stormClusterState;
-                TopologyDetails details = readTopologyDetails(topoId);
+                TopologyDetails details = readTopologyDetails(topoId, base);
                 double sumOnHeap = 0.0;
                 double sumOffHeap = 0.0;
                 double sumCPU = 0.0;
@@ -1720,10 +1721,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
         IStormClusterState state = stormClusterState;
         //read all the topologies
-        List<String> topologyIds = state.activeStorms();
+        Map<String, StormBase> bases = state.topologyBases();
         Map<String, TopologyDetails> tds = new HashMap<>();
-        for (String id: topologyIds) {
-            tds.put(id, readTopologyDetails(id));
+        for (Iterator<Entry<String, StormBase>> it = bases.entrySet().iterator(); it.hasNext(); ) {
+            Entry<String, StormBase> entry = it.next();
+            String id = entry.getKey();
+            try {
+                tds.put(id, readTopologyDetails(id, entry.getValue()));
+            } catch (KeyNotFoundException e) {
+                //A race happened and it is probably not running
+                it.remove();
+            }
         }
         Topologies topologies = new Topologies(tds);
         List<String> assignedTopologyIds = state.assignments(null);
@@ -1737,7 +1745,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
         }
         // make the new assignments for topologies
-        Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, scratchTopoId);
+        Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
         Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
         for (String id: assignedTopologyIds) {
             if (!topologyToExecutorToNodePort.containsKey(id)) {
@@ -1754,8 +1762,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
             Assignment existingAssignment = existingAssignments.get(topoId);
             Set<String> allNodes = new HashSet<>();
-            for (List<Object> nodePort: execToNodePort.values()) {
-                allNodes.add((String) nodePort.get(0));
+            if (execToNodePort != null) {
+                for (List<Object> nodePort: execToNodePort.values()) {
+                    allNodes.add((String) nodePort.get(0));
+                }
             }
             Map<String, String> allNodeHost = new HashMap<>();
             if (existingAssignment != null) {
@@ -2247,7 +2257,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (status != null) {
                 summary.set_sched_status(status);
             }
-            TopologyResources resources = getResourcesForTopology(topoId);
+            TopologyResources resources = getResourcesForTopology(topoId, base);
             if (resources != null) {
                 summary.set_requested_memonheap(resources.getRequestedMemOnHeap());
                 summary.set_requested_memoffheap(resources.getRequestedMemOffHeap());
@@ -3271,6 +3281,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getTopologyInfoWithOptsCalls.mark();
             CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyInfo");
+            if (common.base == null) {
+                throw new NotAliveException(topoId);
+            }
             IStormClusterState state = stormClusterState;
             NumErrorsChoice numErrChoice = OR(options.get_num_err_choice(), NumErrorsChoice.ALL);
             Map<String, List<ErrorInfo>> errors = new HashMap<>();
@@ -3332,7 +3345,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (schedStatus != null) {
                 topoInfo.set_sched_status(schedStatus);
             }
-            TopologyResources resources = getResourcesForTopology(topoId);
+            TopologyResources resources = getResourcesForTopology(topoId, common.base);
             if (resources != null) {
                 topoInfo.set_requested_memonheap(resources.getRequestedMemOnHeap());
                 topoInfo.set_requested_memoffheap(resources.getRequestedMemOffHeap());
@@ -3370,27 +3383,32 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             StormTopology topology = common.topology;
             Map<String, Object> topoConf = common.topoConf;
             StormBase base = common.base;
-            if (assignment == null) {
+            if (base == null) {
                 throw new NotAliveException(topoId);
             }
-            Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
-            Map<String, String> nodeToHost = assignment.get_node_host();
             Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
+            List<WorkerSummary> workerSummaries = null;
             Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
-            for (Entry<List<Long>, NodeInfo> entry: execToNodeInfo.entrySet()) {
-                NodeInfo ni = entry.getValue();
-                List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
-                exec2NodePort.put(entry.getKey(), nodePort);
+            if (assignment != null) {
+                Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
+                Map<String, String> nodeToHost = assignment.get_node_host();
+                for (Entry<List<Long>, NodeInfo> entry: execToNodeInfo.entrySet()) {
+                    NodeInfo ni = entry.getValue();
+                    List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
+                    exec2NodePort.put(entry.getKey(), nodePort);
+                }
+                
+                workerSummaries = StatsUtil.aggWorkerStats(topoId,
+                        topoName,
+                        taskToComp,
+                        beats,
+                        exec2NodePort,
+                        nodeToHost,
+                        workerToResources,
+                        includeSys,
+                        true); //this is the topology page, so we know the user is authorized
             }
-            List<WorkerSummary> workerSummaries = StatsUtil.aggWorkerStats(topoId,
-                    topoName,
-                    taskToComp,
-                    beats,
-                    exec2NodePort,
-                    nodeToHost,
-                    workerToResources,
-                    includeSys,
-                    true); //this is the topology page, so we know the user is authorized
+
             TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
                     exec2NodePort,
                     taskToComp,
@@ -3412,7 +3430,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 commonStats.set_resources_map(setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf));
             }
             
-            topoPageInfo.set_workers(workerSummaries);
+            if (workerSummaries != null) {
+                topoPageInfo.set_workers(workerSummaries);
+            }
             if (base.is_set_owner()) {
                 topoPageInfo.set_owner(base.get_owner());
             }
@@ -3420,7 +3440,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (schedStatus != null) {
                 topoPageInfo.set_sched_status(schedStatus);
             }
-            TopologyResources resources = getResourcesForTopology(topoId);
+            TopologyResources resources = getResourcesForTopology(topoId, base);
             if (resources != null) {
                 topoPageInfo.set_requested_memonheap(resources.getRequestedMemOnHeap());
                 topoPageInfo.set_requested_memoffheap(resources.getRequestedMemOffHeap());
@@ -3525,6 +3545,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getComponentPageInfoCalls.mark();
             CommonTopoInfo info = getCommonTopoInfo(topoId, "getComponentPageInfo");
+            if (info.base == null) {
+                throw new NotAliveException(topoId);
+            }
             StormTopology topology = info.topology;
             Map<String, Object> topoConf = info.topoConf;
             Assignment assignment = info.assignment;
@@ -3557,7 +3580,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             compPageInfo.set_topology_name(info.topoName);
             compPageInfo.set_errors(stormClusterState.errors(topoId, componentId));
             compPageInfo.set_topology_status(extractStatusStr(info.base));
-            if (info.base != null && info.base.is_set_component_debug()) {
+            if (info.base.is_set_component_debug()) {
                 DebugOptions debug = info.base.get_component_debug().get(componentId);
                 if (debug != null) {
                     compPageInfo.set_debug_options(debug);

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
index 2dbefcb..6297dda 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.storm.ProcessSimulator;
-import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.daemon.worker.Worker;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileRequest;
@@ -30,9 +29,6 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import clojure.java.api.Clojure;
-import clojure.lang.IFn;
-
 public class LocalContainer extends Container {
     private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class);
     private volatile boolean _isAlive = false;

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java b/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java
new file mode 100644
index 0000000..e9a61f5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.testing;
+
+public interface CompletableSpout {
+    /**
+     * @return true if all the tuples have been completed else false.
+     */
+    public boolean isExhausted();
+
+    /**
+     * Cleanup any global state kept
+     */
+    default public void clean() {
+        //NOOP
+    }
+    
+    /**
+     * Prepare the spout (globally) before starting the topology
+     */
+    default public void startup() {
+        //NOOP
+    }
+}