You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:04 UTC

[04/10] storm git commit: STORM-1281: LocalCluster, testing4j and testing.clj to java

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java b/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
index c124d58..00d0e57 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/CompleteTopologyParam.java
@@ -17,7 +17,10 @@
  */
 package org.apache.storm.testing;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
+import org.apache.storm.Testing;
 
 /**
  * The param class for the `Testing.completeTopology`.
@@ -26,15 +29,15 @@ public class CompleteTopologyParam {
 	/**
 	 * The mocked spout sources
 	 */
-	private MockedSources mockedSources;
+	private MockedSources mockedSources = new MockedSources();
 	/**
 	 * the config for the topology when it was submitted to the cluster
 	 */
-	private Config stormConf;
+	private Map<String, Object> stormConf = new Config();
 	/**
 	 * whether cleanup the state?
 	 */
-	private Boolean cleanupState;
+	private boolean cleanupState = true;
 	/**
 	 * the topology name you want to submit to the cluster
 	 */
@@ -43,29 +46,39 @@ public class CompleteTopologyParam {
 	/**
 	 * the timeout of topology you want to submit to the cluster
 	 */
-	private Integer timeoutMs;
+	private int timeoutMs = Testing.TEST_TIMEOUT_MS;
 
 	public MockedSources getMockedSources() {
 		return mockedSources;
 	}
 
 	public void setMockedSources(MockedSources mockedSources) {
+	    if (mockedSources == null) {
+	        mockedSources = new MockedSources();
+	    }
+	       
 		this.mockedSources = mockedSources;
 	}
 
-	public Config getStormConf() {
+	public Map<String, Object> getStormConf() {
 		return stormConf;
 	}
 
-	public void setStormConf(Config stormConf) {
+	public void setStormConf(Map<String, Object> stormConf) {
+	    if (stormConf == null) {
+	        stormConf = new Config();
+	    }
 		this.stormConf = stormConf;
 	}
 
-	public Boolean getCleanupState() {
+	public boolean getCleanupState() {
 		return cleanupState;
 	}
 
 	public void setCleanupState(Boolean cleanupState) {
+	    if (cleanupState == null) {
+	        cleanupState = true;
+	    }
 		this.cleanupState = cleanupState;
 	}
 
@@ -82,6 +95,9 @@ public class CompleteTopologyParam {
 	}
 
 	public void setTimeoutMs(Integer timeoutMs) {
+	    if (timeoutMs == null) {
+	        timeoutMs = Testing.TEST_TIMEOUT_MS;
+	    }
 		this.timeoutMs = timeoutMs;
 	}
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
index 02872d0..7801ca0 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
@@ -37,6 +37,10 @@ public class FeederSpout extends BaseRichSpout {
     private SpoutOutputCollector _collector;
     private AckFailDelegate _ackFailDelegate;
 
+    public FeederSpout(List<String> outFields) {
+        this(new Fields(outFields));
+    }
+    
     public FeederSpout(Fields outFields) {
         _id = InprocMessaging.acquireNewPort();
         _outFields = outFields;

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
index 87fc52d..feac43e 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
@@ -30,7 +30,7 @@ import java.util.Map;
 import java.util.UUID;
 import static org.apache.storm.utils.Utils.get;
 
-public class FixedTupleSpout implements IRichSpout {
+public class FixedTupleSpout implements IRichSpout, CompletableSpout {
     private static final Map<String, Integer> acked = new HashMap<String, Integer>();
     private static final Map<String, Integer> failed = new HashMap<String, Integer>();
 
@@ -176,4 +176,9 @@ public class FixedTupleSpout implements IRichSpout {
     public Map<String, Object> getComponentConfiguration() {
         return null;
     }
+
+    @Override
+    public boolean isExhausted() {
+        return getSourceTuples().size() == getCompleted();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java b/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java
new file mode 100644
index 0000000..dafde30
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/InProcessZookeeper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.util.List;
+
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+
+/**
+ * A local Zookeeper instance available for testing.
+ * try (InProcessZookeeper zk = new InProcessZookeeper) {
+ *   // Run Tests...
+ * }
+ */
+public class InProcessZookeeper implements AutoCloseable {
+    
+    private final TmpPath zkTmp;
+    private final NIOServerCnxnFactory zookeeper;
+    private final long zkPort;
+    
+    public InProcessZookeeper() throws Exception {
+        zkTmp = new TmpPath();
+        @SuppressWarnings("unchecked")
+        List<Object> portAndHandle = Zookeeper.mkInprocessZookeeper(zkTmp.getPath(), null);
+        zkPort = (Long) portAndHandle.get(0);
+        zookeeper = (NIOServerCnxnFactory) portAndHandle.get(1);
+    }
+    
+    /**
+     * @return the port ZK is listening on (localhost)
+     */
+    public long getPort() {
+        return zkPort;
+    }
+
+    @Override
+    public void close() throws Exception {
+        Zookeeper.shutdownInprocessZookeeper(zookeeper);
+        zkTmp.close();
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java b/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
index c3946fa..7e1e206 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MkClusterParam.java
@@ -34,7 +34,7 @@ public class MkClusterParam {
 	/**
 	 * cluster config
 	 */
-	private Map daemonConf;
+	private Map<String, Object> daemonConf;
 
 	private Boolean nimbusDaemon;
 	
@@ -53,10 +53,10 @@ public class MkClusterParam {
 	public void setPortsPerSupervisor(Integer portsPerSupervisor) {
 		this.portsPerSupervisor = portsPerSupervisor;
 	}
-	public Map getDaemonConf() {
+	public Map<String, Object> getDaemonConf() {
 		return daemonConf;
 	}
-	public void setDaemonConf(Map daemonConf) {
+	public void setDaemonConf(Map<String, Object> daemonConf) {
 		this.daemonConf = daemonConf;
 	}
 	/**

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java b/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
index ac3e1d3..93aba72 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MkTupleParam.java
@@ -18,6 +18,7 @@
 package org.apache.storm.testing;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class MkTupleParam {
@@ -42,10 +43,16 @@ public class MkTupleParam {
 	public List<String> getFields() {
 		return fields;
 	}
+	
+	public void setFieldsList(List<String> fields) {
+	    if (fields != null) {
+	        this.fields = new ArrayList<>(fields);
+	    } else {
+	        this.fields = null;
+	    }
+	}
+	
 	public void setFields(String... fields) {
-		this.fields = new ArrayList<String>();
-		for (int i = 0; i < fields.length; i++) {
-			this.fields.add(fields[i]);
-		}
+		this.fields = new ArrayList<>(Arrays.asList(fields));
 	}
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
new file mode 100644
index 0000000..ade29a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+
+public class MockLeaderElector implements ILeaderElector {
+    private final boolean isLeader;
+    private final NimbusInfo leaderAddress;
+    
+    public MockLeaderElector() {
+        this(true, "test-host", 9999);
+    }
+
+    public MockLeaderElector(boolean isLeader) {
+        this(isLeader, "test-host", 9999);
+    }
+    
+    public MockLeaderElector(boolean isLeader, String host, int port) {
+        this.isLeader = isLeader;
+        this.leaderAddress = new NimbusInfo(host, port, true);
+    }
+
+    @Override
+    public void prepare(Map conf) {
+        //NOOP
+    }
+
+    @Override
+    public void addToLeaderLockQueue() throws Exception {
+        //NOOP
+    }
+
+    @Override
+    public void removeFromLeaderLockQueue() throws Exception {
+        //NOOP
+    }
+
+    @Override
+    public boolean isLeader() throws Exception {
+        return isLeader;
+    }
+
+    @Override
+    public NimbusInfo getLeader() {
+        return leaderAddress;
+    }
+
+    @Override
+    public List<NimbusInfo> getAllNimbuses() throws Exception {
+        return Arrays.asList(leaderAddress);
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java b/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
index 1063ad2..61df4b6 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MockedSources.java
@@ -31,6 +31,14 @@ public class MockedSources {
 	 */
     private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>();
     
+    public MockedSources() {
+        //Empty
+    }
+    
+    public MockedSources(Map<String, List<FixedTuple>> data) {
+        this.data = new HashMap<>(data);
+    }
+    
     /**
      * add mock data for the spout.
      * 

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
index 0e04a6a..3f310e3 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
@@ -34,7 +34,7 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-public class TestEventLogSpout extends BaseRichSpout {
+public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout {
     public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
     
     private static final Map<String, Integer> acked = new HashMap<String, Integer>();
@@ -136,4 +136,9 @@ public class TestEventLogSpout extends BaseRichSpout {
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("source", "eventId"));
     }
+
+    @Override
+    public boolean isExhausted() {
+        return completed();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java b/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java
new file mode 100644
index 0000000..ccb1b2f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/TmpPath.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TmpPath implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(TmpPath.class);
+    
+    public static String localTempPath() {
+        StringBuilder ret = new StringBuilder().append(System.getProperty("java.io.tmpdir"));
+        if (!Utils.isOnWindows()) {
+            ret.append("/");
+        }
+        ret.append(Utils.uuid());
+        return ret.toString();
+    }
+    
+    private final File path;
+
+    public TmpPath() {
+        this(localTempPath());
+    }
+    
+    public TmpPath(String path) {
+        this.path = new File(path);
+    }
+
+    public String getPath() {
+        return path.getAbsolutePath();
+    }
+    
+    public File getFile() {
+        return path;
+    }
+    
+    @Override
+    public void close() throws Exception {
+        if (path.exists()) {
+            try {
+                FileUtils.forceDelete(path);
+            } catch (Exception e) {
+                //on windows, the host process still holds lock on the logfile
+                LOG.info(e.getMessage());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java b/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
index ff3a32c..00bd1e7 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/TrackedTopology.java
@@ -17,18 +17,119 @@
  */
 package org.apache.storm.testing;
 
-import java.util.HashMap;
-import java.util.Map;
+import static org.apache.storm.Testing.whileTimeout;
 
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
-import clojure.lang.Keyword;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.utils.RegisteredGlobalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class TrackedTopology extends HashMap{
-	public TrackedTopology(Map map) {
-		super(map);
+/**
+ * A tracked topology keeps metrics for every bolt and spout.
+ * This allows a test to know how many tuples have been fully processed.
+ * Metrics are tracked on a per cluster basis.  So only one tracked topology
+ * should be run in a tracked cluster to avoid conflicts.
+ */
+public class TrackedTopology {
+    private static final Logger LOG = LoggerFactory.getLogger(TrackedTopology.class);
+    private final StormTopology topology;
+    private final AtomicInteger lastSpoutCommit;
+    private final ILocalCluster cluster;
+    
+    /**
+     * Create a new topology to be tracked.
+     * @param origTopo the original topology.
+     * @param cluster a cluster that should have been launched with tracking enabled.
+     */
+	public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) {
+	    LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId());
+	    this.cluster = cluster;
+	    lastSpoutCommit = new AtomicInteger(0);
+	    String id = cluster.getTrackedId();
+	    topology = origTopo.deepCopy();
+	    for (Bolt bolt: topology.get_bolts().values()) {
+	        IRichBolt obj = (IRichBolt) Thrift.deserializeComponentObject(bolt.get_bolt_object());
+	        bolt.set_bolt_object(Thrift.serializeComponentObject(new BoltTracker(obj, id)));
+	    }
+	    for (SpoutSpec spout: topology.get_spouts().values()) {
+	        IRichSpout obj = (IRichSpout) Thrift.deserializeComponentObject(spout.get_spout_object());
+	        spout.set_spout_object(Thrift.serializeComponentObject(new SpoutTracker(obj, id)));
+	    }
 	}
+
+    public StormTopology getTopology() {
+		return topology;
+    }
 	
-	public StormTopology getTopology() {
-		return (StormTopology)get(Keyword.intern("topology"));
+	public ILocalCluster getCluster() {
+	    return cluster;
 	}
+	
+	/**
+	 * Wait for 1 tuple to be fully processed
+	 */
+	public void trackedWait() {
+	    trackedWait(1, Testing.TEST_TIMEOUT_MS);
+	}
+	
+	/**
+	 * Wait for amt tuples to be fully processed.
+	 */
+	public void trackedWait(int amt) {
+	    trackedWait(amt, Testing.TEST_TIMEOUT_MS);
+	}
+	
+	/**
+	 * Wait for amt tuples to be fully processed timeoutMs happens.
+	 */
+	public void trackedWait(int amt, int timeoutMs) {
+	    final int target = amt + lastSpoutCommit.get();
+	    final String id = cluster.getTrackedId();
+	    Random rand = ThreadLocalRandom.current();
+	    whileTimeout(timeoutMs,
+	            () -> {
+	                int se = globalAmt(id, "spout-emitted");
+	                int transferred = globalAmt(id, "transferred");
+	                int processed = globalAmt(id, "processed");
+	                LOG.info("emitted {} target {} transferred {} processed {}", se, target, transferred, processed);
+	                return (target != se) || (transferred != processed);
+	            },
+	            () -> {
+	                Time.advanceTimeSecs(1);
+	                try {
+                        Thread.sleep(rand.nextInt(200));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+	            });
+	    lastSpoutCommit.set(target);
+	}
+
+	/**
+     * Read a metric from the tracked cluster (NOT JUST THIS TOPOLOGY)
+     * @param key one of "spout-emitted", "processed", or "transferred"
+     * @return the amount of that metric
+     */
+	public int globalAmt(String key) {
+	    return globalAmt(cluster.getTrackedId(), key);
+	}
+
+    @SuppressWarnings("unchecked")
+    private static int globalAmt(String id, String key) {
+        LOG.warn("Reading tracked metrics for ID {}", id);
+        return ((ConcurrentHashMap<String, AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 40ede4c..a86036e 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -88,10 +88,10 @@ import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
  * conf.put(Config.TOPOLOGY_WORKERS, 4);
  * conf.put(Config.TOPOLOGY_DEBUG, true);
  *
- * LocalCluster cluster = new LocalCluster();
- * cluster.submitTopology("mytopology", conf, builder.createTopology());
- * Utils.sleep(10000);
- * cluster.shutdown();
+ * try (LocalCluster cluster = new LocalCluster();
+ *      LocalTopology topo = cluster.submitTopology("mytopology", conf, builder.createTopology());){
+ *     Utils.sleep(10000);
+ * }
  * ```
  *
  * The pattern for `TopologyBuilder` is to map component ids to components using the setSpout

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index 9b656ee..b6c48c1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -26,36 +26,55 @@ import org.slf4j.LoggerFactory;
 
 
 public class Time {
-    public static final Logger LOG = LoggerFactory.getLogger(Time.class);
-    
+    private static final Logger LOG = LoggerFactory.getLogger(Time.class);
     private static AtomicBoolean simulating = new AtomicBoolean(false);
     private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
-    //TODO: should probably use weak references here or something
     private static volatile Map<Thread, AtomicLong> threadSleepTimes;
     private static final Object sleepTimesLock = new Object();
+    private static AtomicLong simulatedCurrTimeMs;
     
-    private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
-    
-    public static void startSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(true);
-            simulatedCurrTimeMs = new AtomicLong(0);
-            threadSleepTimes = new ConcurrentHashMap<>();
+    public static class SimulatedTime implements AutoCloseable {
+
+        public SimulatedTime() {
+            this(null);
+        }
+        
+        public SimulatedTime(Number ms) {
+            synchronized(Time.sleepTimesLock) {
+                Time.simulating.set(true);
+                Time.simulatedCurrTimeMs = new AtomicLong(0);
+                Time.threadSleepTimes = new ConcurrentHashMap<>();
+                if (ms != null) {
+                    Time.autoAdvanceOnSleep.set(ms.longValue());
+                }
+            }
+        }
+        
+        @Override
+        public void close() {
+            synchronized(Time.sleepTimesLock) {
+                Time.simulating.set(false);    
+                Time.autoAdvanceOnSleep.set(0);
+                Time.threadSleepTimes = null;
+            }
         }
     }
     
-    public static void startSimulatingAutoAdvanceOnSleep(long ms) {
-        synchronized(sleepTimesLock) {
-            startSimulating();
-            autoAdvanceOnSleep.set(ms);
+    @Deprecated
+    public static void startSimulating() {
+        synchronized(Time.sleepTimesLock) {
+            Time.simulating.set(true);
+            Time.simulatedCurrTimeMs = new AtomicLong(0);
+            Time.threadSleepTimes = new ConcurrentHashMap<>();
         }
     }
     
+    @Deprecated
     public static void stopSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(false);    
-            autoAdvanceOnSleep.set(0);
-            threadSleepTimes = null;
+        synchronized(Time.sleepTimesLock) {
+            Time.simulating.set(false);    
+            Time.autoAdvanceOnSleep.set(0);
+            Time.threadSleepTimes = null;
         }
     }
     
@@ -145,6 +164,10 @@ public class Time {
         LOG.debug("Advanced simulated time to {}", newTime);
     }
     
+    public static void advanceTimeSecs(long secs) {
+        advanceTime(secs * 1_000);
+    }
+    
     public static boolean isThreadWaiting(Thread t) {
         if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
         AtomicLong time;

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 925f81b..9c54aed 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -15,24 +15,26 @@
 ;; limitations under the License.
 (ns integration.org.apache.storm.integration-test
   (:use [clojure test])
-  (:import [org.apache.storm Config Thrift])
+  (:import [org.apache.storm Testing LocalCluster$Builder Config Thrift])
   (:import [org.apache.storm.topology TopologyBuilder])
   (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+  (:import [org.apache.storm.testing TrackedTopology MockedSources TestWordCounter TestWordSpout TestGlobalCount FeederSpout CompleteTopologyParam
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
   (:import [org.apache.storm.utils Time])
   (:import [org.apache.storm.tuple Fields])
   (:import [org.apache.storm.cluster StormClusterStateImpl])
   (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm testing config util])
+  (:use [org.apache.storm config util])
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.utils Utils]) 
   (:import [org.apache.storm.daemon StormCommon]))
 
 (deftest test-basic-topology
   (doseq [zmq-on? [true false]]
-    (with-simulated-time-local-cluster [cluster :supervisors 4
-                                        :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+    (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                  (.withSimulatedTime)
+                                  (.withSupervisors 4)
+                                  (.withDaemonConf {STORM-LOCAL-MODE-ZMQ zmq-on?})))]
       (let [topology (Thrift/buildTopology
                       {"1" (Thrift/prepareSpoutDetails
                              (TestWordSpout. true) (Integer. 3))}
@@ -48,19 +50,20 @@
                              {(Utils/getGlobalStreamId "2" nil)
                               (Thrift/prepareGlobalGrouping)}
                              (TestAggregatesCounter.))})
-            results (complete-topology cluster
+            results (Testing/completeTopology cluster
                                        topology
-                                       :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-                                       :storm-conf {TOPOLOGY-WORKERS 2
-                                                    TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE true})]
-        (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                 (read-tuples results "1")))
-        (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-                 (read-tuples results "2")))
+                                       (doto (CompleteTopologyParam.)
+                                         (.setMockedSources (MockedSources. {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}))
+                                         (.setStormConf {TOPOLOGY-WORKERS 2
+                                                    TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE true})))]
+        (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                 (Testing/readTuples results "1")))
+        (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
+                 (Testing/readTuples results "2")))
         (is (= [[1] [2] [3] [4]]
-               (read-tuples results "3")))
+               (Testing/readTuples results "3")))
         (is (= [[1] [2] [3] [4]]
-               (read-tuples results "4")))
+               (Testing/readTuples results "4")))
         ))))
 
 (defbolt emit-task-id ["tid"] {:prepare true}
@@ -73,7 +76,9 @@
         ))))
 
 (deftest test-multi-tasks-per-executor
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                  (.withSimulatedTime)
+                                (.withSupervisors 4)))]
     (let [topology (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true))}
                     {"2" (Thrift/prepareBoltDetails
@@ -82,11 +87,12 @@
                            emit-task-id
                            (Integer. 3)
                            {TOPOLOGY-TASKS 6})})
-          results (complete-topology cluster
+          results (Testing/completeTopology cluster
                                      topology
-                                     :mock-sources {"1" [["a"]]})]
-      (is (ms= [[0] [1] [2] [3] [4] [5]]
-               (read-tuples results "2")))
+                                     (doto (CompleteTopologyParam.)
+                                       (.setMockedSources (MockedSources. {"1" [["a"]]}))))]
+      (is (Testing/multiseteq [[(int 0)] [(int 1)] [(int 2)] [(int 3)] [(int 4)] [(int 5)]]
+               (Testing/readTuples results "2")))
       )))
 
 (defbolt ack-every-other {} {:prepare true}
@@ -119,8 +125,11 @@
   (assert-loop #(.isFailed tracker %) ids))
 
 (deftest test-timeout
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withSupervisors 4)
+                                (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+    (let [feeder (FeederSpout. ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
           topology (Thrift/buildTopology
@@ -128,18 +137,18 @@
                      {"2" (Thrift/prepareBoltDetails
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareGlobalGrouping)} ack-every-other)})]
-      (submit-local-topology (:nimbus cluster)
+      (.submitTopology cluster
                              "timeout-tester"
                              {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
                              topology)
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (.feed feeder ["a"] 1)
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
-      (advance-cluster-time cluster 9)
+      (.advanceClusterTime cluster 9)
       (assert-acked tracker 1 3)
       (is (not (.isFailed tracker 2)))
-      (advance-cluster-time cluster 12)
+      (.advanceClusterTime cluster 12)
       (assert-failed tracker 2)
       )))
 
@@ -158,8 +167,10 @@
         )))))
 
 (deftest test-reset-timeout
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+    (let [feeder (FeederSpout. ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
           topology (Thrift/buildTopology
@@ -167,16 +178,16 @@
                      {"2" (Thrift/prepareBoltDetails 
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareGlobalGrouping)} extend-timeout-twice)})]
-    (submit-local-topology (:nimbus cluster)
+    (.submitTopology cluster
                            "timeout-tester"
                            {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
                            topology)
-    (advance-cluster-time cluster 11)
+    (.advanceClusterTime cluster 11)
     (.feed feeder ["a"] 1)
-    (advance-cluster-time cluster 21)
+    (.advanceClusterTime cluster 21)
     (is (not (.isFailed tracker 1)))
     (is (not (.isAcked tracker 1)))
-    (advance-cluster-time cluster 5)
+    (.advanceClusterTime cluster 5)
     (assert-acked tracker 1)
     )))
 
@@ -214,15 +225,18 @@
 
 (defn try-complete-wc-topology [cluster topology]
   (try (do
-         (complete-topology cluster
+         (Testing/completeTopology cluster
                             topology
-                            :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-                            :storm-conf {TOPOLOGY-WORKERS 2})
+                            (doto (CompleteTopologyParam.)
+                              (.setMockedSources (MockedSources. {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}))
+                              (.setStormConf {TOPOLOGY-WORKERS 2})))
          false)
        (catch InvalidTopologyException e true)))
 
 (deftest test-validate-topology-structure
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSupervisors 4)
+                                (.withSimulatedTime)))]
     (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
           any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
           any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
@@ -239,7 +253,8 @@
 
 (deftest test-system-stream
   ;; this test works because mocking a spout splits up the tuples evenly among the tasks
-  (with-simulated-time-local-cluster [cluster]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)))]
       (let [topology (Thrift/buildTopology
                        {"1" (Thrift/prepareSpoutDetails
                               (TestWordSpout. true) (Integer. 3))}
@@ -249,17 +264,18 @@
                                (Utils/getGlobalStreamId "1" "__system")
                                (Thrift/prepareGlobalGrouping)}
                                identity-bolt (Integer. 1))})
-            results (complete-topology cluster
+            results (Testing/completeTopology cluster
                                        topology
-                                       :mock-sources {"1" [["a"] ["b"] ["c"]]}
-                                       :storm-conf {TOPOLOGY-WORKERS 2})]
-        (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
-                 (read-tuples results "2")))
+                                       (doto (CompleteTopologyParam.)
+                                         (.setMockedSources (MockedSources. {"1" [["a"] ["b"] ["c"]]}))
+                                         (.setStormConf {TOPOLOGY-WORKERS 2})))]
+        (is (Testing/multiseteq [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+                 (Testing/readTuples results "2")))
         )))
 
 (defn ack-tracking-feeder [fields]
   (let [tracker (AckTracker.)]
-    [(doto (feeder-spout fields)
+    [(doto (FeederSpout. fields)
        (.setAckFailDelegate tracker))
      (fn [val]
        (is (= (.getNumAcks tracker) val))
@@ -293,12 +309,11 @@
   (ack! collector tuple))
 
 (deftest test-acking
-  (with-tracked-cluster [cluster]
+  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
     (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
           [feeder2 checker2] (ack-tracking-feeder ["num"])
           [feeder3 checker3] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
+          tracked (TrackedTopology.
                    (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails feeder1)
                       "2" (Thrift/prepareSpoutDetails feeder2)
@@ -331,42 +346,40 @@
                             {(Utils/getGlobalStreamId "8" nil)
                              (Thrift/prepareShuffleGrouping)}
                             ack-bolt)}
-                     ))]
-      (submit-local-topology (:nimbus cluster)
-                             "acking-test1"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
+                     )
+                     cluster)]
+      (.submitTopology cluster
+                       "acking-test1"
+                       {}
+                       tracked)
+      (.advanceClusterTime cluster 11)
       (.feed feeder1 [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker1 0)
       (.feed feeder2 [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker1 1)
       (checker2 1)
       (.feed feeder1 [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker1 0)
       (.feed feeder1 [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker1 1)
       (.feed feeder3 [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker1 0)
       (checker3 0)
       (.feed feeder2 [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker1 1)
       (checker2 1)
-      (checker3 1)
-      
-      )))
+      (checker3 1))))
 
 (deftest test-ack-branching
-  (with-tracked-cluster [cluster]
+  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
     (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
+          tracked (TrackedTopology.
                    (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails feeder)}
                      {"2" (Thrift/prepareBoltDetails
@@ -382,19 +395,19 @@
                              (Thrift/prepareShuffleGrouping)
                              (Utils/getGlobalStreamId "3" nil)
                              (Thrift/prepareShuffleGrouping)}
-                             (agg-bolt 4))}))]
-      (submit-local-topology (:nimbus cluster)
-                             "test-acking2"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
+                             (agg-bolt 4))})
+                    cluster)]
+      (.submitTopology cluster
+                       "test-acking2"
+                       {}
+                       tracked)
+      (.advanceClusterTime cluster 11)
       (.feed feeder [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker 0)
       (.feed feeder [1])
-      (tracked-wait tracked 1)
-      (checker 2)
-      )))
+      (Testing/trackedWait tracked (int 1))
+      (checker 2))))
 
 (defbolt dup-anchor ["num"]
   [tuple collector]
@@ -418,8 +431,10 @@
       )))
 
 (deftest test-submit-inactive-topology
-  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+    (let [feeder (FeederSpout. ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
           topology (Thrift/buildTopology
@@ -432,28 +447,27 @@
       (reset! bolt-prepared? false)
       (reset! spout-opened? false)      
       
-      (submit-local-topology-with-opts (:nimbus cluster)
+      (.submitTopologyWithOpts cluster
         "test"
         {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
         topology
         (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 9)
+      (.advanceClusterTime cluster 9)
       (is (not @bolt-prepared?))
       (is (not @spout-opened?))        
-      (.activate (:nimbus cluster) "test")
+      (.activate (.getNimbus cluster) "test")
 
-      (advance-cluster-time cluster 12)
+      (.advanceClusterTime cluster 12)
       (assert-acked tracker 1)
       (is @bolt-prepared?)
       (is @spout-opened?))))
 
 (deftest test-acking-self-anchor
-  (with-tracked-cluster [cluster]
+  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
     (let [[feeder checker] (ack-tracking-feeder ["num"])
-          tracked (mk-tracked-topology
-                   cluster
+          tracked (TrackedTopology.
                    (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails feeder)}
                      {"2" (Thrift/prepareBoltDetails
@@ -463,21 +477,21 @@
                       "3" (Thrift/prepareBoltDetails
                             {(Utils/getGlobalStreamId "2" nil)
                              (Thrift/prepareShuffleGrouping)}
-                            ack-bolt)}))]
-      (submit-local-topology (:nimbus cluster)
-                             "test"
-                             {}
-                             (:topology tracked))
-      (advance-cluster-time cluster 11)
+                            ack-bolt)})
+                   cluster)]
+      (.submitTopology cluster
+                       "test"
+                       {}
+                       tracked)
+      (.advanceClusterTime cluster 11)
       (.feed feeder [1])
-      (tracked-wait tracked 1)
+      (Testing/trackedWait tracked (int 1))
       (checker 1)
       (.feed feeder [1])
       (.feed feeder [1])
       (.feed feeder [1])
-      (tracked-wait tracked 3)
-      (checker 3)
-      )))
+      (Testing/trackedWait tracked (int 3))
+      (checker 3))))
 
 (defspout IncSpout ["word"]
   [conf context collector]
@@ -504,22 +518,25 @@
      )))
 
 (deftest test-kryo-decorators-config
-  (with-simulated-time-local-cluster [cluster
-                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
-                                                    TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+                                                  TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]})))]
     (let [builder (TopologyBuilder.)
           _ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
           _ (-> builder (.setBolt "2" (TestConfBolt. {TOPOLOGY-KRYO-DECORATORS ["one" "two"]})) (.shuffleGrouping "1"))
-          results (complete-topology cluster
+          results (Testing/completeTopology cluster
                                      (.createTopology builder)
-                                     :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
-                                     :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]})]
+                                     (doto (CompleteTopologyParam.)
+                                       (.setStormConf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]})
+                                       (.setMockedSources (MockedSources. {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))))]
       (is (= {"topology.kryo.decorators" (list "one" "two" "three")}
-             (->> (read-tuples results "2") (apply concat) (apply hash-map)))))))
+             (->> (Testing/readTuples results "2") (apply concat) (apply hash-map)))))))
 
 (deftest test-component-specific-config
-  (with-simulated-time-local-cluster [cluster
-                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true})))]
     (let [builder (TopologyBuilder.)
           _ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
           _ (-> builder
@@ -533,14 +550,15 @@
                 (.shuffleGrouping "1")
                 (.setMaxTaskParallelism (int 2))
                 (.addConfiguration "fake.config2" 987))
-          results (complete-topology cluster
+          results (Testing/completeTopology cluster
                                      (.createTopology builder)
-                                     :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer", "fake.type3" "a.serializer3"}]}
-                                     :mock-sources {"1" [["fake.config"]
+                                     (doto (CompleteTopologyParam.)
+                                       (.setStormConf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer", "fake.type3" "a.serializer3"}]})
+                                        (.setMockedSources (MockedSources. {"1" [["fake.config"]
                                                          [TOPOLOGY-MAX-TASK-PARALLELISM]
                                                          [TOPOLOGY-MAX-SPOUT-PENDING]
                                                          ["fake.config2"]
-                                                         [TOPOLOGY-KRYO-REGISTER]]})]
+                                                         [TOPOLOGY-KRYO-REGISTER]]}))))]
       (is (= {"fake.config" 123
               "fake.config2" 987
               TOPOLOGY-MAX-TASK-PARALLELISM 2
@@ -548,7 +566,7 @@
               TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
                                       "fake.type2" "a.serializer"
                                       "fake.type3" "a.serializer3"}}
-             (->> (read-tuples results "2")
+             (->> (Testing/readTuples results "2")
                   (apply concat)
                   (apply hash-map)))))))
 
@@ -582,7 +600,8 @@
         ))))
 
 (deftest test-hooks
-  (with-simulated-time-local-cluster [cluster]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)))]
     (let [topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails
                             (TestPlannerSpout. (Fields. ["conf"])))}
@@ -590,18 +609,19 @@
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareShuffleGrouping)}
                             hooks-bolt)})
-          results (complete-topology cluster
+          results (Testing/completeTopology cluster
                                      topology
-                                     :mock-sources {"1" [[1]
+                                     (doto (CompleteTopologyParam.)
+                                       (.setMockedSources (MockedSources. {"1" [[1]
                                                          [1]
                                                          [1]
                                                          [1]
-                                                         ]})]
+                                                         ]}))))]
       (is (= [[0 0 0 0]
               [2 1 0 1]
               [4 1 1 2]
               [6 2 1 3]]
-             (read-tuples results "2")
+             (Testing/readTuples results "2")
              )))))
 
 (defbolt report-errors-bolt {}

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 342bba6..5b437c7 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns integration.org.apache.storm.testing4j-test
   (:use [clojure.test])
-  (:use [org.apache.storm config testing util])
+  (:use [org.apache.storm config util])
   (:use [org.apache.storm.internal clojure])
   (:require [integration.org.apache.storm.integration-test :as it])
   (:require [org.apache.storm.internal.thrift :as thrift])
@@ -23,7 +23,7 @@
            [org.apache.storm.generated GlobalStreamId])
   (:import [org.apache.storm.tuple Values Tuple])
   (:import [org.apache.storm.utils Time Utils])
-  (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
+  (:import [org.apache.storm.testing MkClusterParam TestJob MockedSources TestWordSpout FeederSpout
             TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
             AckFailMapTracker MkTupleParam])
   (:import [org.apache.storm.utils Utils])
@@ -45,8 +45,7 @@
     (Testing/withLocalCluster mk-cluster-param (reify TestJob
                                                  (^void run [this ^ILocalCluster cluster]
                                                    (is (not (nil? cluster)))
-                                                   (is (not (nil? (.getState cluster))))
-                                                   (is (not (nil? (:nimbus (.getState cluster))))))))))
+                                                   (is (not (nil? (.getNimbus cluster)))))))))
 
 (deftest test-with-simulated-time-local-cluster
   (let [mk-cluster-param (doto (MkClusterParam.)
@@ -58,8 +57,7 @@
     (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
                                                               (^void run [this ^ILocalCluster cluster]
                                                                 (is (not (nil? cluster)))
-                                                                (is (not (nil? (.getState cluster))))
-                                                                (is (not (nil? (:nimbus (.getState cluster)))))
+                                                                (is (not (nil? (.getNimbus cluster))))
                                                                 (is (Time/isSimulating)))))
     (is (not (Time/isSimulating)))))
 
@@ -96,8 +94,8 @@
                                                  complete-topology-param)]
            (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
                            (Testing/readTuples results "1")))
-           (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-                           (read-tuples results "2")))
+           (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
+                           (Testing/readTuples results "2")))
            (is (= [[1] [2] [3] [4]]
                   (Testing/readTuples results "3")))
            (is (= [[1] [2] [3] [4]]
@@ -143,7 +141,7 @@
                           "test-acking2"
                           (Config.)
                           (.getTopology tracked))
-         (advance-cluster-time (.getState cluster) 11)
+         (.advanceClusterTime cluster (int 11))
          (.feed feeder [1])
          (Testing/trackedWait tracked (int 1))
          (checker 0)
@@ -161,7 +159,7 @@
      mk-cluster-param
      (reify TestJob
        (^void run [this ^ILocalCluster cluster]
-         (let [feeder (feeder-spout ["field1"])
+         (let [feeder (FeederSpout. ["field1"])
                tracker (AckFailMapTracker.)
                _ (.setAckFailDelegate feeder tracker)
                topology (Thrift/buildTopology
@@ -195,7 +193,7 @@
       mk-cluster-param
       (reify TestJob
         (^void run [this ^ILocalCluster cluster]
-          (let [feeder (feeder-spout ["field1"])
+          (let [feeder (FeederSpout. ["field1"])
                 tracker (AckFailMapTracker.)
                 _ (.setAckFailDelegate feeder tracker)
                 topology (Thrift/buildTopology

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index c55903e..d55009b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -15,18 +15,23 @@
 ;; limitations under the License.
 (ns integration.org.apache.storm.trident.integration-test
   (:use [clojure test])
-  (:require [org.apache.storm [testing :as t]])
+  (:import [org.apache.storm Testing LocalCluster$Builder LocalCluster LocalDRPC])
   (:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter
-            MemoryMapState$Factory])
+            MemoryMapState$Factory FeederCommitterBatchSpout FeederBatchSpout])
   (:import [org.apache.storm.trident.state StateSpec])
-  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
+  (:import [org.apache.storm.trident TridentTopology]
+           [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
            [org.apache.storm.trident.operation BaseFunction]
+           [org.apache.storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet]
+           [org.apache.storm.tuple Fields]
            [org.json.simple.parser JSONParser]
+           [org.json.simple JSONValue]
            [org.apache.storm Config])
-  (:use [org.apache.storm.trident testing]
-        [org.apache.storm log util config]))
+  (:use [org.apache.storm log util config]))
 
-(bootstrap-imports)
+(defn exec-drpc [^LocalDRPC drpc function-name args]
+  (if-let [res (.execute drpc function-name args)]
+    (clojurify-structure (JSONValue/parse res))))
 
 (defmacro letlocals
   [& body]
@@ -41,58 +46,58 @@
        ~(first lexpr))))
 
 (deftest test-memory-map-get-tuples
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
+        (bind feeder (FeederBatchSpout. ["sentence"]))
         (bind word-counts
           (-> topo
               (.newStream "tester" feeder)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+              (.groupBy (Fields. ["word"]))
+              (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
               (.parallelismHint 6)
               ))
         (-> topo
             (.newDRPCStream "all-tuples" drpc)
             (.broadcast)
-            (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
-            (.project (fields "word" "count")))
-        (with-topology [cluster topo storm-topo]
-          (feed feeder [["hello the man said"] ["the"]])
+            (.stateQuery word-counts (Fields. ["args"]) (TupleCollectionGet.) (Fields. ["word" "count"]))
+            (.project (Fields. ["word" "count"])))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+          (.feed feeder [["hello the man said"] ["the"]])
           (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
                  (into #{} (exec-drpc drpc "all-tuples" "man"))))
-          (feed feeder [["the foo"]])
+          (.feed feeder [["the foo"]])
           (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
                  (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
 
 (deftest test-word-count
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
+        (bind feeder (FeederBatchSpout. ["sentence"]))
         (bind word-counts
           (-> topo
               (.newStream "tester" feeder)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+              (.groupBy (Fields. ["word"]))
+              (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
               (.parallelismHint 6)
               ))
         (-> topo
             (.newDRPCStream "words" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.groupBy (fields "word"))
-            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
-            (.aggregate (fields "count") (Sum.) (fields "sum"))
-            (.project (fields "sum")))
-        (with-topology [cluster topo storm-topo]
-          (feed feeder [["hello the man said"] ["the"]])
+            (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+            (.groupBy (Fields. ["word"]))
+            (.stateQuery word-counts (Fields. ["word"]) (MapGet.) (Fields. ["count"]))
+            (.aggregate (Fields. ["count"]) (Sum.) (Fields. ["sum"]))
+            (.project (Fields. ["sum"])))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+          (.feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
-          (feed feeder [["the man on the moon"] ["where are you"]])
+          (.feed feeder [["the man on the moon"] ["where are you"]])
           (is (= [[4]] (exec-drpc drpc "words" "the")))
           (is (= [[2]] (exec-drpc drpc "words" "man")))
           (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
@@ -101,56 +106,56 @@
 ;; this test reproduces a bug where committer spouts freeze processing when
 ;; there's at least one repartitioning after the spout
 (deftest test-word-count-committer-spout
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
-        (bind feeder (feeder-committer-spout ["sentence"]))
+        (bind feeder (FeederCommitterBatchSpout. ["sentence"]))
         (.setWaitToEmit feeder false) ;;this causes lots of empty batches
         (bind word-counts
           (-> topo
               (.newStream "tester" feeder)
               (.parallelismHint 2)
-              (.each (fields "sentence") (Split.) (fields "word"))
-              (.groupBy (fields "word"))
-              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+              (.groupBy (Fields. ["word"]))
+              (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
               (.parallelismHint 6)
               ))
         (-> topo
             (.newDRPCStream "words" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.groupBy (fields "word"))
-            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
-            (.aggregate (fields "count") (Sum.) (fields "sum"))
-            (.project (fields "sum")))
-        (with-topology [cluster topo storm-topo]
-          (feed feeder [["hello the man said"] ["the"]])
+            (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+            (.groupBy (Fields. ["word"]))
+            (.stateQuery word-counts (Fields. ["word"]) (MapGet.) (Fields. ["count"]))
+            (.aggregate (Fields. ["count"]) (Sum.) (Fields. ["sum"]))
+            (.project (Fields. ["sum"])))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+          (.feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
           (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
-          (feed feeder [["the man on the moon"] ["where are you"]])
+          (.feed feeder [["the man on the moon"] ["where are you"]])
           (is (= [[4]] (exec-drpc drpc "words" "the")))
           (is (= [[2]] (exec-drpc drpc "words" "man")))
           (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
-          (feed feeder [["the the"]])
+          (.feed feeder [["the the"]])
           (is (= [[6]] (exec-drpc drpc "words" "the")))
-          (feed feeder [["the"]])
+          (.feed feeder [["the"]])
           (is (= [[7]] (exec-drpc drpc "words" "the")))
           )))))
 
 
 (deftest test-count-agg
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
         (-> topo
             (.newDRPCStream "numwords" drpc)
-            (.each (fields "args") (Split.) (fields "word"))
-            (.aggregate (CountAsAggregator.) (fields "count"))
+            (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+            (.aggregate (CountAsAggregator.) (Fields. ["count"]))
             (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
-            (.project (fields "count")))
-        (with-topology [cluster topo storm-topo]
+            (.project (Fields. ["count"])))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
           (doseq [i (range 100)]
             (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
           (is (= [[0]] (exec-drpc drpc "numwords" "")))
@@ -158,76 +163,76 @@
           )))))
 
 (deftest test-split-merge
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
         (bind s1
           (-> drpc-stream
-              (.each (fields "args") (Split.) (fields "word"))
-              (.project (fields "word"))))
+              (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+              (.project (Fields. ["word"]))))
         (bind s2
           (-> drpc-stream
-              (.each (fields "args") (StringLength.) (fields "len"))
-              (.project (fields "len"))))
+              (.each (Fields. ["args"]) (StringLength.) (Fields. ["len"]))
+              (.project (Fields. ["len"]))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo storm-topo]
-          (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-          (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+          (is (Testing/multiseteq [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+          (is (Testing/multiseteq [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
           )))))
 
 (deftest test-multiple-groupings-same-stream
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
         (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
-                                   (.each (fields "args") (TrueFilter.))))
+                                   (.each (Fields. ["args"]) (TrueFilter.))))
         (bind s1
           (-> drpc-stream
-              (.groupBy (fields "args"))
-              (.aggregate (CountAsAggregator.) (fields "count"))))
+              (.groupBy (Fields. ["args"]))
+              (.aggregate (CountAsAggregator.) (Fields. ["count"]))))
         (bind s2
           (-> drpc-stream
-              (.groupBy (fields "args"))
-              (.aggregate (CountAsAggregator.) (fields "count"))))
+              (.groupBy (Fields. ["args"]))
+              (.aggregate (CountAsAggregator.) (Fields. ["count"]))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo storm-topo]
-          (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
-          (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+          (is (Testing/multiseteq [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
+          (is (Testing/multiseteq [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
           )))))
 
 (deftest test-multi-repartition
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
         (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
-                                   (.each (fields "args") (Split.) (fields "word"))
+                                   (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
                                    (.localOrShuffle)
                                    (.shuffle)
-                                   (.aggregate (CountAsAggregator.) (fields "count"))
+                                   (.aggregate (CountAsAggregator.) (Fields. ["count"]))
                                    ))
-        (with-topology [cluster topo storm-topo]
-          (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
-          (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
+          (is (Testing/multiseteq [[2]] (exec-drpc drpc "tester" "the man")))
+          (is (Testing/multiseteq [[1]] (exec-drpc drpc "tester" "aaa")))
           )))))
 
 (deftest test-stream-projection-validation
-  (t/with-local-cluster [cluster]
+  (with-open [cluster (LocalCluster. )]
     (letlocals
-     (bind feeder (feeder-committer-spout ["sentence"]))
+     (bind feeder (FeederCommitterBatchSpout. ["sentence"]))
      (bind topo (TridentTopology.))
      ;; valid projection fields will not throw exceptions
      (bind word-counts
            (-> topo
                (.newStream "tester" feeder)
-               (.each (fields "sentence") (Split.) (fields "word"))
-               (.groupBy (fields "word"))
-               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+               (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+               (.groupBy (Fields. ["word"]))
+               (.persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
                (.parallelismHint 6)
                ))
      (bind stream (-> topo
@@ -235,63 +240,63 @@
      ;; test .each
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.each (fields "sentence1") (Split.) (fields "word")))))
+                      (.each (Fields. ["sentence1"]) (Split.) (Fields. ["word"])))))
      ;; test .groupBy
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word1")))))
+                      (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+                      (.groupBy (Fields. ["word1"])))))
      ;; test .aggregate
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.aggregate (fields "word1") (Count.) (fields "count")))))
+                      (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+                      (.groupBy (Fields. ["word"]))
+                      (.aggregate (Fields. ["word1"]) (Count.) (Fields. ["count"])))))
      ;; test .project
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.project (fields "sentence1")))))
+                      (.project (Fields. ["sentence1"])))))
      ;; test .partitionBy
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.partitionBy (fields "sentence1")))))
+                      (.partitionBy (Fields. ["sentence1"])))))
      ;; test .partitionAggregate
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+                      (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+                      (.partitionAggregate (Fields. ["word1"]) (Count.) (Fields. ["count"])))))
      ;; test .persistentAggregate
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
-                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+                      (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+                      (.groupBy (Fields. ["word"]))
+                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (Fields. ["non-existent"]) (Count.) (Fields. ["count"])))))
      ;; test .partitionPersist
      (is (thrown? IllegalArgumentException
                   (-> stream
-                      (.each (fields "sentence") (Split.) (fields "word"))
-                      (.groupBy (fields "word"))
+                      (.each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
+                      (.groupBy (Fields. ["word"]))
                       (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
-                                         (fields "non-existent")
+                                         (Fields. ["non-existent"])
                                          (CombinerAggStateUpdater. (Count.))
-                                         (fields "count")))))
+                                         (Fields. ["count"])))))
      ;; test .stateQuery
-     (with-drpc [drpc]
+     (with-open [drpc (LocalDRPC.)]
        (is (thrown? IllegalArgumentException
                     (-> topo
                         (.newDRPCStream "words" drpc)
-                        (.each (fields "args") (Split.) (fields "word"))
-                        (.groupBy (fields "word"))
-                        (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+                        (.each (Fields. ["args"]) (Split.) (Fields. ["word"]))
+                        (.groupBy (Fields. ["word"]))
+                        (.stateQuery word-counts (Fields. ["word1"]) (MapGet.) (Fields. ["count"]))))))
      )))
 
 
 (deftest test-set-component-resources
-  (t/with-local-cluster [cluster]
-    (with-drpc [drpc]
+  (with-open [cluster (LocalCluster. )]
+    (with-open [drpc (LocalDRPC.)]
       (letlocals
         (bind topo (TridentTopology.))
-        (bind feeder (feeder-spout ["sentence"]))
+        (bind feeder (FeederBatchSpout. ["sentence"]))
         (bind add-bang (proxy [BaseFunction] []
                          (execute [tuple collector]
                            (. collector emit (str (. tuple getString 0) "!")))))
@@ -304,18 +309,18 @@
               (parallelismHint 5)
               (setCPULoad 20)
               (setMemoryLoad 512 256)
-              (each (fields "sentence") (Split.) (fields "word"))
+              (each (Fields. ["sentence"]) (Split.) (Fields. ["word"]))
               (setCPULoad 10)
               (setMemoryLoad 512)
-              (each (fields "word") add-bang (fields "word!"))
+              (each (Fields. ["word"]) add-bang (Fields. ["word!"]))
               (parallelismHint 10)
               (setCPULoad 50)
               (setMemoryLoad 1024)
-              (groupBy (fields "word!"))
-              (persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (groupBy (Fields. ["word!"]))
+              (persistentAggregate (MemoryMapState$Factory.) (Count.) (Fields. ["count"]))
               (setCPULoad 100)
               (setMemoryLoad 2048)))
-        (with-topology [cluster topo storm-topo]
+        (with-open [storm-topo (.submitTopology cluster "testing" {} (.build topo))]
 
           (let [parse-fn (fn [[k v]]
                            [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))])
@@ -364,23 +369,4 @@
                          (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
                      100.0)))))))))
 
-;; (deftest test-split-merge
-;;   (t/with-local-cluster [cluster]
-;;     (with-drpc [drpc]
-;;       (letlocals
-;;         (bind topo (TridentTopology.))
-;;         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
-;;         (bind s1
-;;           (-> drpc-stream
-;;               (.each (fields "args") (Split.) (fields "word"))
-;;               (.project (fields "word"))))
-;;         (bind s2
-;;           (-> drpc-stream
-;;               (.each (fields "args") (StringLength.) (fields "len"))
-;;               (.project (fields "len"))))
-;;
-;;         (.merge topo [s1 s2])
-;;         (with-topology [cluster topo]
-;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
-;;           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
-;;           )))))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/clojure_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/clojure_test.clj b/storm-core/test/clj/org/apache/storm/clojure_test.clj
deleted file mode 100644
index 13fdeb7..0000000
--- a/storm-core/test/clj/org/apache/storm/clojure_test.clj
+++ /dev/null
@@ -1,159 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.clojure-test
-  (:use [clojure test])
-  (:import [org.apache.storm.testing TestWordSpout TestPlannerSpout]
-           [org.apache.storm.tuple Fields])
-  (:use [org.apache.storm testing config])
-  (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm.internal [thrift :as thrift]])
-  (:import [org.apache.storm Thrift])
-  (:import [org.apache.storm.utils Utils]))
-
-(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
-  (let [ret (str val "lalala")]
-    (emit-bolt! collector [ret] :anchor tuple)
-    (ack! collector tuple)
-    ))
-
-(defbolt lalala-bolt2 ["word"] {:prepare true}
-  [conf context collector]
-  (let [state (atom nil)]
-    (reset! state "lalala")
-    (bolt
-      (execute [tuple]
-        (let [ret (-> (.getValue tuple 0) (str @state))]
-                (emit-bolt! collector [ret] :anchor tuple)
-                (ack! collector tuple)
-                ))
-      )))
-      
-(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
-  [conf context collector]
-  (let [state (atom nil)]
-    (bolt
-      (prepare [_ _ _]
-        (reset! state (str prefix "lalala")))
-      (execute [{val "word" :as tuple}]
-        (let [ret (-> (.getValue tuple 0) (str @state))]
-          (emit-bolt! collector [ret] :anchor tuple)
-          (ack! collector tuple)
-          )))
-    ))
-
-(deftest test-clojure-bolt
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [nimbus (:nimbus cluster)
-          topology (Thrift/buildTopology
-                      {"1" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
-                      {"2" (Thrift/prepareBoltDetails
-                             {(Utils/getGlobalStreamId "1" nil)
-                              (Thrift/prepareShuffleGrouping)}
-                             lalala-bolt1)
-                       "3" (Thrift/prepareBoltDetails
-                             {(Utils/getGlobalStreamId "1" nil)
-                              (Thrift/prepareLocalOrShuffleGrouping)}
-                             lalala-bolt2)
-                       "4" (Thrift/prepareBoltDetails
-                             {(Utils/getGlobalStreamId "1" nil)
-                              (Thrift/prepareShuffleGrouping)}
-                             (lalala-bolt3 "_nathan_"))}
-                     )
-          results (complete-topology cluster
-                                     topology
-                                     :mock-sources {"1" [["david"]
-                                                       ["adam"]
-                                                       ]}
-                                     )]
-      (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
-      (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
-      (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
-      )))
-
-(defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
-  [tuple collector]
-  (if (= (:word tuple) "bar")
-    (do 
-      (emit-bolt! collector {:word "bar" :period "bar" :question "bar"
-                            "exclamation" "bar"})
-      (ack! collector tuple))
-    (let [ res (assoc tuple :period (str (:word tuple) "."))
-           res (assoc res :exclamation (str (:word tuple) "!"))
-           res (assoc res :question (str (:word tuple) "?")) ]
-      (emit-bolt! collector res)
-      (ack! collector tuple))))
-
-(deftest test-map-emit
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [topology (Thrift/buildTopology
-                      {"words" (Thrift/prepareSpoutDetails (TestWordSpout. false))}
-                      {"out" (Thrift/prepareBoltDetails
-                               {(Utils/getGlobalStreamId "words" nil)
-                                (Thrift/prepareShuffleGrouping)}
-                               punctuator-bolt)})
-          results (complete-topology cluster
-                                     topology
-                                     :mock-sources {"words" [["foo"] ["bar"]]}
-                                     )]
-      (is (ms= [["foo" "foo." "foo?" "foo!"]
-                ["bar" "bar" "bar" "bar"]] (read-tuples results "out"))))))
-
-(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf}
-  [conf context collector]
-  (bolt
-   (execute [tuple]
-            (let [name (.getValue tuple 0)
-                  val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))]
-              (emit-bolt! collector [name val] :anchor tuple)
-              (ack! collector tuple))
-            )))
-
-(deftest test-component-specific-config-clojure
-  (with-simulated-time-local-cluster [cluster]
-    (let [topology (Thrift/buildTopology
-                     {"1" (Thrift/prepareSpoutDetails
-                            (TestPlannerSpout. (Fields. ["conf"]))
-                            nil
-                            {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})}
-                     {"2" (Thrift/prepareBoltDetails
-                            {(Utils/getGlobalStreamId "1" nil)
-                             (Thrift/prepareShuffleGrouping)}
-                            (conf-query-bolt {"fake.config" 1
-                                              TOPOLOGY-MAX-TASK-PARALLELISM 2
-                                              TOPOLOGY-MAX-SPOUT-PENDING 10})
-                            nil
-                            {TOPOLOGY-MAX-SPOUT-PENDING 3})})
-          results (complete-topology cluster
-                                     topology
-                                     :topology-name "test123"
-                                     :storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10
-                                                  TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
-                                     :mock-sources {"1" [["fake.config"]
-                                                         [TOPOLOGY-MAX-TASK-PARALLELISM]
-                                                         [TOPOLOGY-MAX-SPOUT-PENDING]
-                                                         ["!MAX_MSG_TIMEOUT"]
-                                                         [TOPOLOGY-NAME]
-                                                         ]})]
-      (is (= {"fake.config" 1
-              TOPOLOGY-MAX-TASK-PARALLELISM 2
-              TOPOLOGY-MAX-SPOUT-PENDING 3
-              "!MAX_MSG_TIMEOUT" 40
-              TOPOLOGY-NAME "test123"}
-             (->> (read-tuples results "2")
-                  (apply concat)
-                  (apply hash-map))
-             )))))