You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:07 UTC

[07/10] storm git commit: STORM-1281: LocalCluster, testing4j and testing.clj to java

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
index 431b9d8..45014fc 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.ShellSpout;
 import org.apache.storm.task.ShellBolt;
@@ -110,12 +111,10 @@ public class WordCountTopologyNode {
     else {
       conf.setMaxTaskParallelism(3);
 
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("word-count", conf, builder.createTopology());
-
-      Thread.sleep(10000);
-
-      cluster.shutdown();
+      try (LocalCluster cluster = new LocalCluster();
+           LocalTopology topo = cluster.submitTopology("word-count", conf, builder.createTopology());) {
+          Thread.sleep(10000);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index ba18f7c..70a23b8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
@@ -77,12 +78,10 @@ public class TridentHBaseWindowingStoreTopology {
         HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
 
         if (args.length == 0) {
-            LocalCluster cluster = new LocalCluster();
-            String topologyName = "wordCounterWithWindowing";
-            cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory));
-            Utils.sleep(120 * 1000);
-            cluster.killTopology(topologyName);
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("wordCounterWithWindowing", conf, buildTopology(windowStoreFactory));) {
+                Utils.sleep(120 * 1000);
+            }
             System.exit(0);
         } else {
             conf.setNumWorkers(3);

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c3661a2..5ddace8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -108,14 +109,14 @@ public class TridentMapExample {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
         if (args.length == 0) {
-            LocalDRPC drpc = new LocalDRPC();
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
-            for (int i = 0; i < 100; i++) {
-                System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
-                Thread.sleep(1000);
+            try (LocalDRPC drpc = new LocalDRPC();
+                 LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
+                for (int i = 0; i < 100; i++) {
+                    System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
+                    Thread.sleep(1000);
+                }
             }
-            cluster.shutdown();
         } else {
             conf.setNumWorkers(3);
             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
index d985436..e5a775b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
@@ -113,10 +114,10 @@ public class TridentMinMaxOfDevicesTopology {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
         if (args.length == 0) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("devices-topology", conf, topology);
-            Utils.sleep(60 * 1000);
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("devices-topology", conf, topology);) {
+                Utils.sleep(60 * 1000);
+            }
             System.exit(0);
         } else {
             conf.setNumWorkers(3);

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
index 192b198..947b64b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.Stream;
@@ -94,10 +95,10 @@ public class TridentMinMaxOfVehiclesTopology {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
         if (args.length == 0) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("vehicles-topology", conf, topology);
-            Utils.sleep(60 * 1000);
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("vehicles-topology", conf, topology);) {
+                Utils.sleep(60 * 1000);
+            }
             System.exit(0);
         } else {
             conf.setNumWorkers(3);

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
index 056b2b6..6533a4e 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.LocalDRPC;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.task.IMetricsContext;
@@ -136,21 +137,17 @@ public class TridentReach {
   }
 
   public static void main(String[] args) throws Exception {
-    LocalDRPC drpc = new LocalDRPC();
 
     Config conf = new Config();
-    LocalCluster cluster = new LocalCluster();
+    try (LocalDRPC drpc = new LocalDRPC();
+         LocalCluster cluster = new LocalCluster();
+         LocalTopology topo = cluster.submitTopology("reach", conf, buildTopology(drpc));) {
 
-    cluster.submitTopology("reach", conf, buildTopology(drpc));
+        Thread.sleep(2000);
 
-    Thread.sleep(2000);
-
-    System.out.println("REACH: " + drpc.execute("reach", "aaa"));
-    System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
-    System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
-
-
-    cluster.shutdown();
-    drpc.shutdown();
+        System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+        System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+        System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 5aec01d..c43b4b0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.base.BaseWindowedBolt;
@@ -83,10 +84,10 @@ public class TridentWindowingInmemoryStoreTopology {
             );
 
             for (WindowConfig windowConfig : list) {
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
-                Utils.sleep(60 * 1000);
-                cluster.shutdown();
+                try (LocalCluster cluster = new LocalCluster();
+                     LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));) {
+                    Utils.sleep(60 * 1000);
+                }
             }
             System.exit(0);
         } else {
@@ -94,5 +95,4 @@ public class TridentWindowingInmemoryStoreTopology {
             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index ac46474..553c26f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
@@ -70,12 +71,13 @@ public class TridentWordCount {
     Config conf = new Config();
     conf.setMaxSpoutPending(20);
     if (args.length == 0) {
-      LocalDRPC drpc = new LocalDRPC();
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
-      for (int i = 0; i < 100; i++) {
-        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
-        Thread.sleep(1000);
+      try (LocalDRPC drpc = new LocalDRPC();
+           LocalCluster cluster = new LocalCluster();
+           LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
+        for (int i = 0; i < 100; i++) {
+          System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+          Thread.sleep(1000);
+        }
       }
     }
     else {

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
index d7f2bf4..06ec4d7 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
@@ -19,6 +19,7 @@ package org.apache.storm.starter.util;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
@@ -34,11 +35,12 @@ public final class StormRunner {
 
   public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
       throws InterruptedException {
-    LocalCluster cluster = new LocalCluster();
-    cluster.submitTopology(topologyName, conf, topology);
-    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
-    cluster.killTopology(topologyName);
-    cluster.shutdown();
+    try (LocalCluster cluster = new LocalCluster();
+         LocalTopology topo = cluster.submitTopology(topologyName, conf, topology);) {
+      Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf)

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index ba513dd..d073350 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -19,6 +19,7 @@ package storm.starter;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
@@ -134,12 +135,10 @@ public class StatefulTopology {
             conf.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            StormTopology topology = builder.createTopology();
-            cluster.submitTopology("test", conf, topology);
-            Utils.sleep(40000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topology = cluster.submitTopology("test", conf, builder.createTopology());) {
+                Utils.sleep(40000);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
index e18ef36..e43759c 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulWindowingTopology.java
@@ -19,6 +19,7 @@ package storm.starter;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.starter.bolt.PrinterBolt;
@@ -101,12 +102,10 @@ public class StatefulWindowingTopology {
             conf.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            StormTopology topology = builder.createTopology();
-            cluster.submitTopology("test", conf, topology);
-            Utils.sleep(40000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
+                Utils.sleep(40000);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 5848d2f..982765f 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -19,6 +19,7 @@ package org.apache.storm.flux;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.SubmitOptions;
@@ -204,11 +205,11 @@ public class Flux {
                 } else {
                     cluster = new LocalCluster();
                 }
-                cluster.submitTopology(topologyName, conf, topology);
-
-                Utils.sleep(sleepTime);
-                cluster.killTopology(topologyName);
-                cluster.shutdown();
+                try (LocalTopology topo = cluster.submitTopology(topologyName, conf, topology)) {
+                    Utils.sleep(sleepTime);
+                } finally {
+                    cluster.shutdown();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index c1ceeba..5755de8 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -29,9 +29,8 @@ import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.storm.Config;
-import org.apache.storm.ILocalCluster;
 import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.sql.TestUtils;
 import org.apache.storm.sql.compiler.TestCompilerUtils;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
@@ -40,8 +39,10 @@ import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.time.ZoneOffset;
@@ -57,6 +58,20 @@ public class TestPlanCompiler {
   private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
           RelDataTypeSystem.DEFAULT);
   private final DataContext dataContext = new StormDataContext();
+  private static LocalCluster cluster;
+
+  @BeforeClass
+  public static void staticSetup() throws Exception {
+    cluster = new LocalCluster();
+  }
+
+  @AfterClass
+  public static void staticCleanup() {
+    if (cluster!= null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
 
   @Before
   public void setUp() {
@@ -347,11 +362,8 @@ public class TestPlanCompiler {
     final Config conf = new Config();
     conf.setMaxSpoutPending(20);
 
-    ILocalCluster cluster = new LocalCluster();
-    StormTopology stormTopo = topo.build();
-    try {
-      Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
-      cluster.submitTopology("storm-sql", conf, stormTopo);
+    Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
+    try (LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) {
       waitForCompletion(1000 * 1000, new Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
@@ -359,8 +371,10 @@ public class TestPlanCompiler {
         }
       });
     } finally {
+      while(cluster.getClusterInfo().get_topologies_size() > 0) {
+        Thread.sleep(10);
+      }
       Utils.resetClassLoaderForJavaDeSerialize();
-      cluster.shutdown();
     }
   }
 
@@ -375,4 +389,4 @@ public class TestPlanCompiler {
     Assert.assertTrue("Two lists are not same (even ignoring order)!\n"+ "Expected: " + expected + "\n" + "Actual: " + actual,
             CollectionUtils.isEqualCollection(expected, actual));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
index 88b2bf1..298353c 100644
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.druid;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.druid.bolt.DruidBeamBolt;
 import org.apache.storm.druid.bolt.DruidBeamFactory;
@@ -69,12 +70,10 @@ public class SampleDruidBoltTopology {
         } else {
             conf.setMaxTaskParallelism(3);
 
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());
-
-            Thread.sleep(30000);
-
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());) {
+                Thread.sleep(30000);
+            }
             System.exit(0);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
index 0e20ecd..3de18c1 100644
--- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
@@ -20,6 +20,7 @@ package org.apache.storm.druid;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.druid.bolt.DruidBeamFactory;
 import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
@@ -78,12 +79,10 @@ public class SampleDruidBoltTridentTopology {
         } else {
             conf.setMaxTaskParallelism(3);
 
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("druid-test", conf, tridentTopology.build());
-
-            Thread.sleep(30000);
-
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("druid-test", conf, tridentTopology.build());) {
+                Thread.sleep(30000);
+            }
             System.exit(0);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
index 2c2261c..ae15634 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -20,6 +20,7 @@ package org.apache.storm.eventhubs.samples;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.TopologyBuilder;
 
@@ -135,14 +136,12 @@ public class EventCount {
     } else {
       config.setMaxTaskParallelism(2);
 
-      LocalCluster localCluster = new LocalCluster();
-      localCluster.submitTopology("test", config, topology);
-
-      Thread.sleep(5000000);
-
-      localCluster.shutdown();
+      try (LocalCluster localCluster = new LocalCluster();
+           LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
+        Thread.sleep(5000000);
+      }
     }
-	}
+  }
   
   protected void runScenario(String[] args) throws Exception{
     readEHConfig(args);

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
index 3324aac..82dbd5b 100644
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -24,6 +24,7 @@ import javax.jms.TextMessage;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.jms.JmsMessageProducer;
 import org.apache.storm.jms.JmsProvider;
@@ -120,11 +121,10 @@ public class ExampleJmsTopology {
 
             conf.setDebug(true);
 
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("storm-jms-example", conf, builder.createTopology());
-            Utils.sleep(60000);
-            cluster.killTopology("storm-jms-example");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("storm-jms-example", conf, builder.createTopology());) {
+                Utils.sleep(60000);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 5a78137..10ad2c7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -58,10 +58,9 @@ public class KafkaSpoutTopologyMainNamedTopics {
         } else {
             submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
         }
-
     }
 
-    protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException {
+    protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception {
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology("test", config, topology);
         stopWaitingForInput();

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 0873601..049fce7 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.storm.kafka;
 import org.junit.Test;
 
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.After;
 import org.junit.Before;
 import static org.junit.Assert.assertEquals;
@@ -34,15 +35,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
     private static final Long TEST_OFFSET2 = 102L;
     private static final Long TEST_OFFSET3 = 105L;
     private static final Long TEST_NEW_OFFSET = 103L;
+    private SimulatedTime st;
 
     @Before
     public void setup() throws Exception {
-        Time.startSimulating();
+        st = new SimulatedTime();
     }
 
     @After
     public void cleanup() throws Exception {
-        Time.stopSimulating();
+        if (st != null) {
+            st.close();
+            st = null;
+        }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
index 57725b4..0dd4d73 100644
--- a/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
+++ b/external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.storm.mqtt;
 
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.testing.IntegrationTest;
 import org.apache.storm.topology.TopologyBuilder;
@@ -100,31 +101,31 @@ public class StormMqttIntegrationTest implements Serializable{
         Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
         byte[] qoses = connection.subscribe(topics);
 
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("test", new Config(), buildMqttTopology());
+        try (LocalCluster cluster = new LocalCluster();
+             LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) {
 
-        LOG.info("topology started");
-        while(!spoutActivated) {
-            Thread.sleep(500);
-        }
+            LOG.info("topology started");
+            while(!spoutActivated) {
+                Thread.sleep(500);
+            }
 
-        // publish a retained message to the broker
-        MqttOptions options = new MqttOptions();
-        options.setCleanConnection(false);
-        MqttPublisher publisher = new MqttPublisher(options, true);
-        publisher.connectMqtt("MqttPublisher");
-        publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
+            // publish a retained message to the broker
+            MqttOptions options = new MqttOptions();
+            options.setCleanConnection(false);
+            MqttPublisher publisher = new MqttPublisher(options, true);
+            publisher.connectMqtt("MqttPublisher");
+            publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
 
-        LOG.info("published message");
+            LOG.info("published message");
 
-        Message message = connection.receive();
-        LOG.info("Message recieved on topic: {}", message.getTopic());
-        LOG.info("Payload: {}", new String(message.getPayload()));
-        message.ack();
+            Message message = connection.receive();
+            LOG.info("Message recieved on topic: {}", message.getTopic());
+            LOG.info("Payload: {}", new String(message.getPayload()));
+            message.ack();
 
-        Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
-        Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
-        cluster.shutdown();
+            Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
+            Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
+        }
     }
 
     public StormTopology buildMqttTopology(){

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
index d464608..992bb5d 100644
--- a/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
+++ b/integration-test/src/main/java/org/apache/storm/ExclamationTopology.java
@@ -18,6 +18,7 @@
 package org.apache.storm;
 
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.TestWordSpout;
@@ -75,11 +76,10 @@ public class ExclamationTopology {
     }
     else {
 
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("test", conf, topology);
-      Utils.sleep(10000);
-      cluster.killTopology("test");
-      cluster.shutdown();
+      try (LocalCluster cluster = new LocalCluster();
+           LocalTopolgoy topo = cluster.submitTopology("test", conf, topology);) {
+        Utils.sleep(10000);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-clojure/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj b/storm-clojure/src/clj/org/apache/storm/clojure.clj
index 607fc24..1fe0c26 100644
--- a/storm-clojure/src/clj/org/apache/storm/clojure.clj
+++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj
@@ -195,6 +195,24 @@
   (let [values (tuple-values values collector stream)]
     (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))
 
+(defmacro defalias
+  "Defines an alias for a var: a new var with the same root binding (if
+  any) and similar metadata. The metadata of the alias is its initial
+  metadata (as provided by def) merged into the metadata of the original."
+  ([name orig]
+   `(do
+      (alter-meta!
+        (if (.hasRoot (var ~orig))
+          (def ~name (.getRawRoot (var ~orig)))
+          (def ~name))
+        ;; When copying metadata, disregard {:macro false}.
+        ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
+        #(conj (dissoc % :macro)
+               (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
+      (var ~name)))
+  ([name orig doc]
+   (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
+
 (defalias topology thrift/mk-topology)
 (defalias bolt-spec thrift/mk-bolt-spec)
 (defalias spout-spec thrift/mk-spout-spec)

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-clojure/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/testing.clj b/storm-clojure/src/clj/org/apache/storm/testing.clj
new file mode 100644
index 0000000..e080dd2
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/testing.clj
@@ -0,0 +1,270 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testing
+  (:import [org.apache.storm LocalCluster$Builder])
+  (:import [java.util.function UnaryOperator])
+  (:import [org.apache.storm.utils Time Time$SimulatedTime Utils RegisteredGlobalState])
+  (:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam 
+            TrackedTopology CompleteTopologyParam MockedSources])
+  (:import [org.apache.storm Thrift Testing Testing$Condition])
+  (:import [org.apache.storm.testing MockLeaderElector])
+  (:import [org.json.simple JSONValue])
+  (:use [org.apache.storm util config log])
+  (:use [org.apache.storm thrift]))
+
+(defnk add-supervisor
+  [cluster-map :ports 2 :conf {} :id nil]
+  (let [local-cluster (:local-cluster cluster-map)]
+    (.addSupervisor local-cluster ports conf id)))
+
+(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
+  (MockLeaderElector. is-leader leader-name leader-port))
+
+(defn local-cluster-state [local-cluster]
+    {:nimbus (.getNimbus local-cluster)
+     :daemon-conf (.getDaemonConf local-cluster)
+     :storm-cluster-state (.getClusterState local-cluster)
+     :local-cluster local-cluster})
+
+(defnk mk-mocked-nimbus 
+  [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil 
+   :leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
+  (let [builder (doto (LocalCluster$Builder.)
+                  (.withDaemonConf daemon-conf)
+                  (.withINimbus inimbus)
+                  (.withBlobStore blob-store)
+                  (.withClusterState cluster-state)
+                  (.withLeaderElector leader-elector)
+                  (.withGroupMapper group-mapper)
+                  (.withNimbusDaemon nimbus-daemon)
+                  (.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
+        local-cluster (.build builder)]
+    (local-cluster-state local-cluster)))
+
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+  (let [builder (doto (LocalCluster$Builder.)
+                  (.withSupervisors supervisors)
+                  (.withPortsPerSupervisor ports-per-supervisor)
+                  (.withDaemonConf daemon-conf)
+                  (.withINimbus inimbus)
+                  (.withGroupMapper group-mapper)
+                  (.withSupervisorSlotPortMin supervisor-slot-port-min)
+                  (.withNimbusDaemon nimbus-daemon))
+        local-cluster (.build builder)]
+    (local-cluster-state local-cluster)))
+
+(defn get-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+  (.getSupervisor local-cluster supervisor-id))) 
+
+(defn kill-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+  (.killSupervisor local-cluster supervisor-id))) 
+
+(defn kill-local-storm-cluster [cluster-map]
+ (let [local-cluster (:local-cluster cluster-map)]
+  (.close local-cluster))) 
+
+(defmacro while-timeout [timeout-ms condition & body]
+  `(Testing/whileTimeout ~timeout-ms
+     (reify Testing$Condition (exec [this] ~condition))
+     (fn [] ~@body)))
+
+(defn wait-for-condition
+  ([apredicate]
+    (wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
+  ([timeout-ms apredicate]
+    (while-timeout timeout-ms (not (apredicate))
+      (Time/sleep 100))))
+
+(defn wait-until-cluster-waiting
+  "Wait until the cluster is idle. Should be used with time simulation."
+  ([cluster-map]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.waitForIdle local-cluster)))
+  ([cluster-map timeout-ms]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.waitForIdle local-cluster timeout-ms))))
+
+(defn advance-cluster-time
+  ([cluster-map secs increment-secs]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.advanceClusterTime local-cluster secs increment-secs))) 
+  ([cluster-map secs]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.advanceClusterTime local-cluster secs))))
+
+(defmacro with-mocked-nimbus
+  [[nimbus-sym & args] & body]
+  `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
+     (try
+       ~@body
+       (catch Throwable t#
+         (log-error t# "Error in cluster")
+         (throw t#))
+       (finally
+         (let [keep-waiting?# (atom true)
+               f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
+           (kill-local-storm-cluster ~nimbus-sym)
+           (reset! keep-waiting?# false)
+            @f#)))))
+
+(defmacro with-local-cluster
+  [[cluster-sym & args] & body]
+  `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
+     (try
+       ~@body
+       (catch Throwable t#
+         (log-error t# "Error in cluster")
+         (throw t#))
+       (finally
+         (let [keep-waiting?# (atom true)
+               f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
+           (kill-local-storm-cluster ~cluster-sym)
+           (reset! keep-waiting?# false)
+            @f#)))))
+
+(defmacro with-simulated-time-local-cluster
+  [& args]
+  `(with-open [_# (Time$SimulatedTime.)]
+     (with-local-cluster ~@args)))
+
+(defmacro with-inprocess-zookeeper
+  [port-sym & body]
+  `(with-open [zks# (InProcessZookeeper. )]
+     (let [~port-sym (.getPort zks#)]
+       ~@body)))
+
+(defn submit-local-topology
+  [nimbus storm-name conf topology]
+  (when-not (Utils/isValidConf conf)
+    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+  (.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
+
+(defn submit-local-topology-with-opts
+  [nimbus storm-name conf topology submit-opts]
+  (when-not (Utils/isValidConf conf)
+    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+  (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
+
+(defn simulate-wait
+  [cluster-map]
+  (Testing/simulateWait (:local-cluster cluster-map)))
+
+(defn spout-objects [spec-map]
+  (for [[_ spout-spec] spec-map]
+    (-> spout-spec
+        .get_spout_object
+        (Thrift/deserializeComponentObject))))
+
+(defn capture-topology
+  [topology]
+  (let [cap-topo (Testing/captureTopology topology)]
+    {:topology (.topology cap-topo)
+     :capturer (.capturer cap-topo)}))
+
+(defnk complete-topology
+  [cluster-map topology
+   :mock-sources {}
+   :storm-conf {}
+   :cleanup-state true
+   :topology-name nil
+   :timeout-ms Testing/TEST_TIMEOUT_MS]
+  (Testing/completeTopology (:local-cluster cluster-map) topology, 
+      (doto (CompleteTopologyParam.)
+            (.setStormConf storm-conf)
+            (.setTopologyName topology-name)
+            (.setTimeoutMs timeout-ms)
+            (.setMockedSources (MockedSources. mock-sources))
+            (.setCleanupState cleanup-state))))
+
+(defn read-tuples
+  ([results component-id stream-id]
+   (Testing/readTuples results component-id stream-id))
+  ([results component-id]
+   (Testing/readTuples results component-id )))
+
+(defn ms=
+  [a b]
+  (Testing/multiseteq a b))
+
+(def TRACKER-BOLT-ID "+++tracker-bolt")
+
+;; TODO: should override system-topology! and wrap everything there
+(defn mk-tracked-topology
+  ([tracked-cluster topology]
+   (let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
+     {:topology (.getTopology tt)
+      :tracked-topo tt})))
+
+(defn increment-global!
+  [id key amt]
+  (-> (RegisteredGlobalState/getState id)
+      (get key)
+      (.addAndGet amt)))
+
+(defn global-amt
+  [id key]
+  (-> (RegisteredGlobalState/getState id)
+      (get key)
+      .get))
+
+(defnk mkClusterParam 
+  [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
+  ;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
+  (doto (MkClusterParam. )
+    (.setDaemonConf daemon-conf)
+    (.setNimbusDaemon nimbus-daemon)
+    (.setPortsPerSupervisor (int ports-per-supervisor))
+    (.setSupervisors (int supervisors))))
+
+(defmacro with-tracked-cluster
+  [[cluster-sym & cluster-args] & body]
+  `(Testing/withTrackedCluster
+       (mkClusterParam ~@cluster-args)
+       (reify TestJob
+         (run [this# lc#]
+           (let [~cluster-sym (local-cluster-state lc#)]
+             ~@body)))))
+
+(defn tracked-wait
+  "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
+  ([tracked-map]
+     (Testing/trackedWait (:tracked-topo tracked-map)))
+  ([tracked-map amt]
+     (Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
+  ([tracked-map amt timeout-ms]
+     (Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
+
+(defnk test-tuple
+  [values
+   :stream Utils/DEFAULT_STREAM_ID
+   :component "component"
+   :fields nil]
+  (Testing/testTuple
+    values
+    (doto (MkTupleParam. )
+      (.setStream stream)
+      (.setComponent component)
+      (.setFieldsList fields))))
+
+(defmacro with-timeout
+  [millis unit & body]
+  `(let [f# (future ~@body)]
+     (try
+       (.get f# ~millis ~unit)
+       (finally (future-cancel f#)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/LocalCluster.clj b/storm-core/src/clj/org/apache/storm/LocalCluster.clj
deleted file mode 100644
index bce2a2e..0000000
--- a/storm-core/src/clj/org/apache/storm/LocalCluster.clj
+++ /dev/null
@@ -1,106 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.LocalCluster
-  (:use [org.apache.storm testing config])
-  (:import [org.apache.storm.utils Utils])
-  (:import [java.util Map])
-  (:gen-class
-    :init init
-    :implements [org.apache.storm.ILocalCluster]
-    :constructors {[] []
-                   [java.util.Map] []
-                   [String Long] []}
-    :state state))
-
-(defn -init
-  ([]
-   (let [ret (mk-local-storm-cluster
-               :daemon-conf
-               {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
-     [[] ret]))
-  ([^String zk-host ^Long zk-port]
-   (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
-                                                     STORM-ZOOKEEPER-SERVERS (list zk-host)
-                                                     STORM-ZOOKEEPER-PORT zk-port})]
-     [[] ret]))
-  ([^Map stateMap]
-     [[] stateMap]))
-
-(defn submit-hook [hook name conf topology]
-  (let [topologyInfo (Utils/getTopologyInfo name nil conf)]
-    (.notify hook topologyInfo conf topology)))
-
-(defn -submitTopology
-  [this name conf topology]
-  (submit-local-topology
-    (:nimbus (. this state)) name conf topology)
-  (let [hook (Utils/getConfiguredClass conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
-    (when hook (submit-hook hook name conf topology))))
-
-
-(defn -submitTopologyWithOpts
-  [this name conf topology submit-opts]
-  (submit-local-topology-with-opts
-    (:nimbus (. this state)) name conf topology submit-opts))
-
-(defn -uploadNewCredentials
-  [this name creds]
-  (.uploadNewCredentials (:nimbus (. this state)) name creds))
-
-(defn -shutdown
-  [this]
-  (kill-local-storm-cluster (. this state)))
-
-(defn -killTopology
-  [this name]
-  (.killTopology (:nimbus (. this state)) name))
-
-(defn -getTopologyConf
-  [this id]
-  (.getTopologyConf (:nimbus (. this state)) id))
-
-(defn -getTopology
-  [this id]
-  (.getTopology (:nimbus (. this state)) id))
-
-(defn -getClusterInfo
-  [this]
-  (.getClusterInfo (:nimbus (. this state))))
-
-(defn -getTopologyInfo
-  [this id]
-  (.getTopologyInfo (:nimbus (. this state)) id))
-
-(defn -killTopologyWithOpts
-  [this name opts]
-  (.killTopologyWithOpts (:nimbus (. this state)) name opts))
-
-(defn -activate
-  [this name]
-  (.activate (:nimbus (. this state)) name))
-
-(defn -deactivate
-  [this name]
-  (.deactivate (:nimbus (. this state)) name))
-
-(defn -rebalance
-  [this name opts]
-  (.rebalance (:nimbus (. this state)) name opts))
-
-(defn -getState
-  [this]
-  (.state this))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index d0c4d87..bfe47ed 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -15,14 +15,7 @@
 ;; limitations under the License.
 
 (ns org.apache.storm.config
-  (:import [java.io FileReader File IOException]
-           [org.apache.storm.generated StormTopology])
-  (:import [org.apache.storm Config])
-  (:import [org.apache.storm.utils Utils LocalState ConfigUtils MutableInt])
-  (:import [org.apache.storm.validation ConfigValidation])
-  (:import [org.apache.commons.io FileUtils])
-  (:require [clojure [string :as str]])
-  (:use [org.apache.storm log util]))
+  (:import [org.apache.storm Config]))
 
 (defn- clojure-config-name [name]
   (.replace (.toUpperCase name) "_" "-"))
@@ -33,12 +26,3 @@
         new-name (clojure-config-name name)]
     (eval
       `(def ~(symbol new-name) (. Config ~(symbol name))))))
-
-(def ALL-CONFIGS
-  (dofor [f (seq (.getFields Config))]
-         (.get f nil)))
-
-;; TODO this function and its callings will be replace when nimbus and supervisor move to Java
-(defn cluster-mode
-  [conf & args]
-  (keyword (conf STORM-CLUSTER-MODE)))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
deleted file mode 100644
index 6f041bb..0000000
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ /dev/null
@@ -1,240 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.converter
-  (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
-            StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
-            TopologyActionOptions DebugOptions ProfileRequest]
-           [org.apache.storm.daemon.nimbus TopologyActions]
-           [org.apache.storm.utils Utils]
-           [org.apache.storm.stats StatsUtil])
-  (:import [org.apache.storm.cluster ExecutorBeat])
-  (:use [org.apache.storm util log])
-  (:require [org.apache.storm.daemon [common :as common]]))
-
-(defn thriftify-supervisor-info [supervisor-info]
-  (doto (SupervisorInfo.)
-    (.set_time_secs (long (:time-secs supervisor-info)))
-    (.set_hostname (:hostname supervisor-info))
-    (.set_assignment_id (:assignment-id supervisor-info))
-    (.set_used_ports (map long (:used-ports supervisor-info)))
-    (.set_meta (map long (:meta supervisor-info)))
-    (.set_scheduler_meta (:scheduler-meta supervisor-info))
-    (.set_uptime_secs (long (:uptime-secs supervisor-info)))
-    (.set_version (:version supervisor-info))
-    (.set_resources_map (:resources-map supervisor-info))
-    ))
-
-(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
-  (if supervisor-info
-    (org.apache.storm.daemon.common.SupervisorInfo.
-      (.get_time_secs supervisor-info)
-      (.get_hostname supervisor-info)
-      (.get_assignment_id supervisor-info)
-      (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info)))
-      (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
-      (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
-      (.get_uptime_secs supervisor-info)
-      (.get_version supervisor-info)
-      (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map)))))
-
-(defn thriftify-assignment [assignment]
-  (let [thrift-assignment (doto (Assignment.)
-                            (.set_master_code_dir (:master-code-dir assignment))
-                            (.set_node_host (:node->host assignment))
-                            (.set_executor_node_port (into {}
-                                                           (map (fn [[k v]]
-                                                                  [(map long k)
-                                                                   (NodeInfo. (first v) (set (map long (rest v))))])
-                                                                (:executor->node+port assignment))))
-                            (.set_executor_start_time_secs
-                              (into {}
-                                    (map (fn [[k v]]
-                                           [(map long k) (long v)])
-                                         (:executor->start-time-secs assignment)))))]
-    (if (:worker->resources assignment)
-      (.set_worker_resources thrift-assignment (into {} (map
-                                                          (fn [[node+port resources]]
-                                                            [(NodeInfo. (first node+port) (set (map long (rest node+port))))
-                                                             (doto (WorkerResources.)
-                                                               (.set_mem_on_heap (first resources))
-                                                               (.set_mem_off_heap (second resources))
-                                                               (.set_cpu (last resources)))])
-                                                          (:worker->resources assignment)))))
-    thrift-assignment))
-
-(defn clojurify-task->node_port [task->node_port]
-  (into {}
-    (map-val
-      (fn [nodeInfo]
-        (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
-      task->node_port)))
-
-;TODO: when translating this function, you should replace the map-key with a proper for loop HERE
-(defn clojurify-executor->node_port [executor->node_port]
-  (into {}
-    (map-val
-      (fn [nodeInfo]
-        (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
-      (map-key
-        (fn [list-of-executors]
-          (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
-        executor->node_port))))
-
-(defn thriftify-executor->node_port [executor->node_port]
-  (into {}
-    (map (fn [[k v]]
-            [(map long k)
-             (NodeInfo. (first v) (set (map long (rest v))))])
-          executor->node_port))
-)
-
-(defn clojurify-worker->resources [worker->resources]
-  "convert worker info to be [node, port]
-   convert resources to be [mem_on_heap mem_off_heap cpu]"
-  (into {} (map
-             (fn [[nodeInfo resources]]
-               [(concat [(.get_node nodeInfo)] (.get_port nodeInfo))
-                [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]])
-             worker->resources)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn clojurify-assignment [^Assignment assignment]
-  (if assignment
-    (org.apache.storm.daemon.common.Assignment.
-      (.get_master_code_dir assignment)
-      (into {} (.get_node_host assignment))
-      (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
-      (map-key (fn [executor] (into [] executor))
-        (into {} (.get_executor_start_time_secs assignment)))
-      (clojurify-worker->resources (into {} (.get_worker_resources assignment))))))
-
-(defn convert-to-symbol-from-status [status]
-  (if status {:type status} nil))
-
-(defn- convert-to-status-from-symbol [status]
-  (if status
-    (:type status)))
-
-(defn assoc-non-nil
-  [m k v]
-  (if v (assoc m k v) m))
-
-(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
-  (-> {:action TopologyActions/REBALANCE}
-    (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
-    (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
-    (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-rebalance-options [rebalance-options]
-  (if rebalance-options
-    (let [thrift-rebalance-options (RebalanceOptions.)]
-      (if (:delay-secs rebalance-options)
-        (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options))))
-      (if (:num-workers rebalance-options)
-        (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options))))
-      (if (:component->executors rebalance-options)
-        (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options))))
-      thrift-rebalance-options)))
-
-(defn clojurify-kill-options [^KillOptions kill-options]
-  (-> {:action TopologyActions/KILL}
-    (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options)))))
-
-(defn thriftify-kill-options [kill-options]
-  (if kill-options
-    (let [thrift-kill-options (KillOptions.)]
-      (if (:delay-secs kill-options)
-        (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
-      thrift-kill-options)))
-
-(defn thriftify-topology-action-options [storm-base]
-  (if (:topology-action-options storm-base)
-    (let [ topology-action-options (:topology-action-options storm-base)
-           action (:action topology-action-options)
-           thrift-topology-action-options (TopologyActionOptions.)]
-      (if (= action TopologyActions/KILL)
-        (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options)))
-      (if (= action TopologyActions/REBALANCE)
-        (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options)))
-      thrift-topology-action-options)))
-
-(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options]
-  (if topology-action-options
-    (or (and (.is_set_kill_options topology-action-options)
-             (clojurify-kill-options
-               (.get_kill_options topology-action-options)))
-        (and (.is_set_rebalance_options topology-action-options)
-             (clojurify-rebalance-options
-               (.get_rebalance_options topology-action-options))))))
-
-(defn clojurify-debugoptions [^DebugOptions options]
-  (if options
-    {
-      :enable (.is_enable options)
-      :samplingpct (.get_samplingpct options)
-      }
-    ))
-
-(defn thriftify-debugoptions [options]
-  (doto (DebugOptions.)
-    (.set_enable (get options :enable false))
-    (.set_samplingpct (get options :samplingpct 10))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-storm-base [storm-base]
-  (doto (StormBase.)
-    (.set_name (:storm-name storm-base))
-    (.set_launch_time_secs (if (:launch-time-secs storm-base) (int (:launch-time-secs storm-base)) 0))
-    (.set_status (convert-to-status-from-symbol (:status storm-base)))
-    (.set_num_workers (if (:num-workers storm-base) (int (:num-workers storm-base)) 0))
-    (.set_component_executors (map-val int (:component->executors storm-base)))
-    (.set_owner (:owner storm-base))
-    (.set_topology_action_options (thriftify-topology-action-options storm-base))
-    (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
-    (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn clojurify-storm-base [^StormBase storm-base]
-  (if storm-base
-    (org.apache.storm.daemon.common.StormBase.
-      (.get_name storm-base)
-      (.get_launch_time_secs storm-base)
-      (convert-to-symbol-from-status (.get_status storm-base))
-      (.get_num_workers storm-base)
-      (into {} (.get_component_executors storm-base))
-      (.get_owner storm-base)
-      (clojurify-topology-action-options (.get_topology_action_options storm-base))
-      (convert-to-symbol-from-status (.get_prev_status storm-base))
-      (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
-
-(defn clojurify-profile-request
-  [^ProfileRequest request]
-  (when request
-    {:host (.get_node (.get_nodeInfo request))
-     :port (first (.get_port (.get_nodeInfo request)))
-     :action     (.get_action request)
-     :timestamp  (.get_time_stamp request)}))
-
-(defn thriftify-credentials [credentials]
-    (doto (Credentials.)
-      (.set_creds (if credentials credentials {}))))
-
-(defn clojurify-crdentials [^Credentials credentials]
-  (if credentials
-    (into {} (.get_creds credentials))
-    nil
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
deleted file mode 100644
index 01a49b3..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ /dev/null
@@ -1,55 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.common
-  (:use [org.apache.storm log config util])
-  (:require [clojure.set :as set])
-  (:import (org.apache.storm.task WorkerTopologyContext)
-           (org.apache.storm.utils Utils ConfigUtils)
-           (java.io InterruptedIOException)))
-
-;; the task id is the virtual port
-;; node->host is here so that tasks know who to talk to just from assignment
-;; this avoid situation where node goes down and task doesn't know what to do information-wise
-(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources])
-
-
-;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
-
-(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
-
-(defrecord ExecutorStats [^long processed
-                          ^long acked
-                          ^long emitted
-                          ^long transferred
-                          ^long failed])
-
-(defn new-executor-stats []
-  (ExecutorStats. 0 0 0 0 0))
-
-(defmacro defserverfn [name & body]
-  `(let [exec-fn# (fn ~@body)]
-    (defn ~name [& args#]
-      (try-cause
-        (apply exec-fn# args#)
-      (catch InterruptedIOException e#
-        (throw e#))
-      (catch InterruptedException e#
-        (throw e#))
-      (catch Throwable t#
-        (log-error t# "Error on initialization of server " ~(str name))
-        (Utils/exitProcess 13 "Error on initialization")
-        )))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
deleted file mode 100644
index a02a8e2..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ /dev/null
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.local-supervisor
-  (:import [org.apache.storm.daemon.supervisor Supervisor]
-           [org.apache.storm.utils ConfigUtils])
-  (:use [org.apache.storm.daemon common])
-  (:gen-class))
-
-(defserverfn mk-local-supervisor [conf shared-context isupervisor]
-  (if (not (ConfigUtils/isLocalMode conf))
-    (throw
-      (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
-  (let [supervisor-server (Supervisor. conf shared-context isupervisor)]
-    (.launch supervisor-server)
-    supervisor-server))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/internal/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/thrift.clj b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
index 07e0bdf..87a0441 100644
--- a/storm-core/src/clj/org/apache/storm/internal/thrift.clj
+++ b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
@@ -50,24 +50,6 @@
   [^Grouping grouping]
   (grouping-constants (.getSetField grouping)))
 
-(defn nimbus-client-and-conn
-  ([host port]
-    (nimbus-client-and-conn host port nil))
-  ([host port as-user]
-  (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
-  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        nimbusClient (NimbusClient. conf host port nil as-user)
-        client (.getClient nimbusClient)
-        transport (.transport nimbusClient)]
-        [client transport] )))
-
-(defmacro with-nimbus-connection
-  [[client-sym host port] & body]
-  `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
-    (try
-      ~@body
-    (finally (.close conn#)))))
-
 (defmacro with-configured-nimbus-connection
   [client-sym & body]
   `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/local_state_converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state_converter.clj b/storm-core/src/clj/org/apache/storm/local_state_converter.clj
deleted file mode 100644
index e8eeaca..0000000
--- a/storm-core/src/clj/org/apache/storm/local_state_converter.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.local-state-converter
-  (:import [org.apache.storm.generated ExecutorInfo]))
-
-(defn ->ExecutorInfo
-  [[low high]] (ExecutorInfo. low high))
-
-(defn ->ExecutorInfo-list
-  [executors]
-  (map ->ExecutorInfo executors))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/log.clj b/storm-core/src/clj/org/apache/storm/log.clj
index 96570e3..7a006ef 100644
--- a/storm-core/src/clj/org/apache/storm/log.clj
+++ b/storm-core/src/clj/org/apache/storm/log.clj
@@ -15,9 +15,7 @@
 ;; limitations under the License.
 
 (ns org.apache.storm.log
-  (:require [clojure.tools.logging :as log])
-  (:use [clojure pprint])
-  (:import [java.io StringWriter]))
+  (:require [clojure.tools.logging :as log]))
 
 (defmacro log-message
   [& args]
@@ -31,26 +29,6 @@
   [& args]
   `(log/debug (str ~@args)))
 
-(defmacro log-warn-error
-  [e & args]
-  `(log/warn (str ~@args) ~e))
-
 (defmacro log-warn
   [& args]
   `(log/warn (str ~@args)))
-
-(defn log-capture!
-  [& args]
-  (apply log/log-capture! args))
-
-(defn log-stream
-  [& args]
-  (apply log/log-stream args))
-
-(defmacro log-pprint
-  [& args]
-  `(let [^StringWriter writer# (StringWriter.)]
-     (doall
-       (for [object# [~@args]]
-         (pprint object# writer#)))
-     (log-message "\n" writer#)))