You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:10 UTC

[02/10] storm git commit: STORM-2447: add in storm local to avoid having server on worker classpath

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
index 947b64b..0d77914 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -17,9 +17,12 @@
  */
 package org.apache.storm.starter.trident;
 
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.Stream;
@@ -29,12 +32,6 @@ import org.apache.storm.trident.testing.FixedBatchSpout;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * This class demonstrates different usages of
@@ -94,16 +91,8 @@ public class TridentMinMaxOfVehiclesTopology {
         StormTopology topology = buildVehiclesTopology();
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
-        if (args.length == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("vehicles-topology", conf, topology);) {
-                Utils.sleep(60 * 1000);
-            }
-            System.exit(0);
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
-        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
     }
 
     static class SpeedComparator implements Comparator<TridentTuple>, Serializable {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
index 6533a4e..7294091 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java
@@ -17,14 +17,16 @@
  */
 package org.apache.storm.starter.trident;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFunction;
@@ -37,117 +39,115 @@ import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.state.map.ReadOnlyMapState;
 import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
 
 public class TridentReach {
-  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
-    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
-    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
-    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
-  }};
-
-  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
-    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
-    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
-    put("tim", Arrays.asList("alex"));
-    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
-    put("adam", Arrays.asList("david", "carissa"));
-    put("mike", Arrays.asList("john", "bob"));
-    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
-  }};
-
-  public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
-    public static class Factory implements StateFactory {
-      Map _map;
-
-      public Factory(Map map) {
-        _map = map;
-      }
-
-      @Override
-      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-        return new StaticSingleKeyMapState(_map);
-      }
+    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+        put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+        put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+        put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+    }};
+
+    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+        put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+        put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+        put("tim", Arrays.asList("alex"));
+        put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+        put("adam", Arrays.asList("david", "carissa"));
+        put("mike", Arrays.asList("john", "bob"));
+        put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+    }};
+
+    public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
+        public static class Factory implements StateFactory {
+            Map _map;
+
+            public Factory(Map map) {
+                _map = map;
+            }
+
+            @Override
+            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+                return new StaticSingleKeyMapState(_map);
+            }
 
-    }
-
-    Map _map;
-
-    public StaticSingleKeyMapState(Map map) {
-      _map = map;
-    }
+        }
 
+        Map _map;
 
-    @Override
-    public List<Object> multiGet(List<List<Object>> keys) {
-      List<Object> ret = new ArrayList();
-      for (List<Object> key : keys) {
-        Object singleKey = key.get(0);
-        ret.add(_map.get(singleKey));
-      }
-      return ret;
-    }
+        public StaticSingleKeyMapState(Map map) {
+            _map = map;
+        }
 
-  }
 
-  public static class One implements CombinerAggregator<Integer> {
-    @Override
-    public Integer init(TridentTuple tuple) {
-      return 1;
-    }
+        @Override
+        public List<Object> multiGet(List<List<Object>> keys) {
+            List<Object> ret = new ArrayList();
+            for (List<Object> key : keys) {
+                Object singleKey = key.get(0);
+                ret.add(_map.get(singleKey));
+            }
+            return ret;
+        }
 
-    @Override
-    public Integer combine(Integer val1, Integer val2) {
-      return 1;
     }
 
-    @Override
-    public Integer zero() {
-      return 1;
-    }
-  }
+    public static class One implements CombinerAggregator<Integer> {
+        @Override
+        public Integer init(TridentTuple tuple) {
+            return 1;
+        }
 
-  public static class ExpandList extends BaseFunction {
+        @Override
+        public Integer combine(Integer val1, Integer val2) {
+            return 1;
+        }
 
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-      List l = (List) tuple.getValue(0);
-      if (l != null) {
-        for (Object o : l) {
-          collector.emit(new Values(o));
+        @Override
+        public Integer zero() {
+            return 1;
         }
-      }
     }
 
-  }
+    public static class ExpandList extends BaseFunction {
 
-  public static StormTopology buildTopology(LocalDRPC drpc) {
-    TridentTopology topology = new TridentTopology();
-    TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
-    TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            List l = (List) tuple.getValue(0);
+            if (l != null) {
+                for (Object o : l) {
+                    collector.emit(new Values(o));
+                }
+            }
+        }
 
+    }
 
-    topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
-        "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
-        tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
-        new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
-        "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
-    return topology.build();
-  }
+    public static StormTopology buildTopology() {
+        TridentTopology topology = new TridentTopology();
+        TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
+        TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
 
-  public static void main(String[] args) throws Exception {
 
-    Config conf = new Config();
-    try (LocalDRPC drpc = new LocalDRPC();
-         LocalCluster cluster = new LocalCluster();
-         LocalTopology topo = cluster.submitTopology("reach", conf, buildTopology(drpc));) {
+        topology.newDRPCStream("reach").stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
+                "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
+                        tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
+                                new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
+                                        "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
+        return topology.build();
+    }
 
-        Thread.sleep(2000);
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        StormSubmitter.submitTopology("reach", conf, buildTopology());
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            Thread.sleep(2000);
 
-        System.out.println("REACH: " + drpc.execute("reach", "aaa"));
-        System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
-        System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+            System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+            System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+            System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index c43b4b0..2da29bf 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -19,11 +19,8 @@
 package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.Consumer;
@@ -33,17 +30,13 @@ import org.apache.storm.trident.testing.Split;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
 import org.apache.storm.trident.windowing.WindowsStoreFactory;
-import org.apache.storm.trident.windowing.config.*;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Sample application of trident windowing which uses inmemory store for storing tuples in window.
  */
@@ -74,25 +67,12 @@ public class TridentWindowingInmemoryStoreTopology {
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
-
-        if (args.length == 0) {
-            List<? extends WindowConfig> list = Arrays.asList(
-                    SlidingCountWindow.of(1000, 100)
-                    ,TumblingCountWindow.of(1000)
-                    ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
-                    ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
-            );
-
-            for (WindowConfig windowConfig : list) {
-                try (LocalCluster cluster = new LocalCluster();
-                     LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));) {
-                    Utils.sleep(60 * 1000);
-                }
-            }
-            System.exit(0);
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
+        String topoName = "wordCounter";
+        if (args.length > 0) {
+            topoName = args[0];
         }
+        
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index 553c26f..0f86e1f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -18,9 +18,6 @@
 package org.apache.storm.starter.trident;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
@@ -35,54 +32,53 @@ import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
 
 
 public class TridentWordCount {
-  public static class Split extends BaseFunction {
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-      String sentence = tuple.getString(0);
-      for (String word : sentence.split(" ")) {
-        collector.emit(new Values(word));
-      }
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
     }
-  }
 
-  public static StormTopology buildTopology(LocalDRPC drpc) {
-    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
-        new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
-        new Values("how many apples can you eat"), new Values("to be or not to be the person"));
-    spout.setCycle(true);
+    public static StormTopology buildTopology() {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
 
-    TridentTopology topology = new TridentTopology();
-    TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
-        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
-        new Count(), new Fields("count")).parallelismHint(16);
+        TridentTopology topology = new TridentTopology();
+        TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+                new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
+                        new Count(), new Fields("count")).parallelismHint(16);
 
-    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word"))
-            .groupBy(new Fields("word"))
-            .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
-            .each(new Fields("count"), new FilterNull())
-            .project(new Fields("word", "count"));
-    return topology.build();
-  }
+        topology.newDRPCStream("words").each(new Fields("args"), new Split(), new Fields("word"))
+        .groupBy(new Fields("word"))
+        .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
+        .each(new Fields("count"), new FilterNull())
+        .project(new Fields("word", "count"));
+        return topology.build();
+    }
 
-  public static void main(String[] args) throws Exception {
-    Config conf = new Config();
-    conf.setMaxSpoutPending(20);
-    if (args.length == 0) {
-      try (LocalDRPC drpc = new LocalDRPC();
-           LocalCluster cluster = new LocalCluster();
-           LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
-        for (int i = 0; i < 100; i++) {
-          System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
-          Thread.sleep(1000);
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        String topoName = "wordCounter";
+        if (args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology());
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            for (int i = 0; i < 10; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+                Thread.sleep(1000);
+            }
         }
-      }
-    }
-    else {
-      conf.setNumWorkers(3);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 7fe6071..deaefb8 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -65,13 +65,6 @@
             <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
-            <version>${project.version}</version>
-            <type>jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-framework</artifactId>
             <version>${curator.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
index ae15634..4709b5e 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -17,20 +17,17 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.samples;
 
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
+import java.io.FileReader;
+import java.util.Properties;
 
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
 import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
 import org.apache.storm.eventhubs.spout.EventHubSpout;
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.FileReader;
-import java.util.Properties;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 
 /**
  * The basic scenario topology that uses EventHubSpout with PartialCountBolt
@@ -124,23 +121,17 @@ public class EventCount {
   }
 	
   protected void submitTopology(String[] args, StormTopology topology) throws Exception {
-	  Config config = new Config();
-    config.setDebug(false);
-    //Enable metrics
-    config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
+      Config config = new Config();
+      config.setDebug(false);
+      //Enable metrics
+      config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
 
-    
-	  if (args != null && args.length > 0) {
-      config.setNumWorkers(numWorkers);
-      StormSubmitter.submitTopology(args[0], config, topology);
-    } else {
-      config.setMaxTaskParallelism(2);
-
-      try (LocalCluster localCluster = new LocalCluster();
-           LocalTopology topo = localCluster.submitTopology("test", config, topology);) {
-        Thread.sleep(5000000);
+      String topoName = "test";
+      if (args.length > 0) {
+          topoName = args[0];
       }
-    }
+      config.setNumWorkers(numWorkers);
+      StormSubmitter.submitTopology(topoName, config, topology);
   }
   
   protected void runScenario(String[] args) throws Exception{

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 222bf2d..1d38ada 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -17,22 +17,33 @@
  */
 package org.apache.storm.flux;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.flux.model.BoltDef;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.SpoutDef;
+import org.apache.storm.flux.model.StreamDef;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.SubmitOptions;
 import org.apache.storm.generated.TopologyInitialStatus;
-import org.apache.storm.utils.Utils;
-import org.apache.commons.cli.*;
-import org.apache.storm.flux.model.*;
-import org.apache.storm.flux.parser.FluxParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-
 /**
  * Flux entry point.
  *
@@ -40,10 +51,6 @@ import java.io.*;
 public class Flux {
     private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
 
-    private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l;
-
-    private static final Long DEFAULT_ZK_PORT = 2181l;
-
     private static final String OPTION_LOCAL = "local";
     private static final String OPTION_REMOTE = "remote";
     private static final String OPTION_RESOURCE = "resource";
@@ -169,45 +176,8 @@ public class Flux {
                 }
                 StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
             } else {
-                LOG.info("Running in local mode...");
-
-                String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
-                Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
-                if (sleepStr != null) {
-                    sleepTime = Long.parseLong(sleepStr);
-                }
-                LOG.debug("Sleep time: {}", sleepTime);
-                LocalCluster cluster = null;
-
-                // in-process or external zookeeper
-                if(cmd.hasOption(OPTION_ZOOKEEPER)){
-                    String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
-                    LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr);
-                    long zkPort = DEFAULT_ZK_PORT;
-                    String zkHost = null;
-                    if(zkStr.contains(":")){
-                        String[] hostPort = zkStr.split(":");
-                        zkHost = hostPort[0];
-                        zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
-
-                    } else {
-                        zkHost = zkStr;
-                    }
-                    // the following constructor is only available in 0.9.3 and later
-                    try {
-                        cluster = new LocalCluster(zkHost, zkPort);
-                    } catch (NoSuchMethodError e){
-                        LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
-                        System.exit(1);
-                    }
-                } else {
-                    cluster = new LocalCluster();
-                }
-                try (LocalTopology topo = cluster.submitTopology(topologyName, conf, topology)) {
-                    Utils.sleep(sleepTime);
-                } finally {
-                    cluster.shutdown();
-                }
+                LOG.error("To run in local mode run with 'storm local' instead of 'storm jar'");
+                return;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/flux/pom.xml
----------------------------------------------------------------------
diff --git a/flux/pom.xml b/flux/pom.xml
index d1f330f..4e04fa8 100644
--- a/flux/pom.xml
+++ b/flux/pom.xml
@@ -44,16 +44,9 @@
     </modules>
 
     <dependencies>
-        <!--
-        Since Flux uses LocalCluster to provide the feature: running topology locally...
-        User should notice that configured topology will be run with 'storm-client' dependencies
-        when adding Flux into topology dependency.
-        If user want to run topology with 'storm-server' dependencies, user can just include
-        'storm-server' as 'compile' scope, and exclude 'storm-client' from 'storm-server'.
-        -->
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1281171..91017f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -320,11 +320,13 @@
         <module>storm-buildtools/maven-shade-clojure-transformer</module>
         <module>storm-buildtools/storm-maven-plugins</module>
         <module>storm-client</module>
+        <module>storm-client-misc</module>
         <module>storm-server</module>
         <module>storm-core</module>
         <module>storm-webapp</module>
         <module>storm-rename-hack</module>
         <module>storm-clojure</module>
+        <module>storm-clojure-test</module>
         <module>storm-submit-tools</module>
         <module>flux</module>
         <module>sql</module>
@@ -368,7 +370,6 @@
         <module>examples/storm-pmml-examples</module>
         <module>examples/storm-jms-examples</module>
         <module>examples/storm-perf</module>
-        <module>storm-client-misc</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 5abb04a..f359722 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -30,6 +30,7 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.DRPCClient;
 import org.apache.storm.utils.ExtendedThreadPoolExecutor;
 import org.apache.storm.utils.ServiceRegistry;
 import java.util.ArrayList;
@@ -59,8 +60,8 @@ public class DRPCSpout extends BaseRichSpout {
     List<DRPCInvocationsClient> _clients = new ArrayList<>();
     transient LinkedList<Future<Void>> _futures = null;
     transient ExecutorService _backround = null;
-    String _function;
-    String _local_drpc_id = null;
+    final String _function;
+    final String _local_drpc_id;
     
     private static class DRPCMessageId {
         String id;
@@ -75,6 +76,11 @@ public class DRPCSpout extends BaseRichSpout {
     
     public DRPCSpout(String function) {
         _function = function;
+        if (DRPCClient.isLocalOverride()) {
+            _local_drpc_id = DRPCClient.getOverrideServiceId();
+        } else {
+            _local_drpc_id = null; 
+        }
     }
 
     public DRPCSpout(String function, ILocalDRPC drpc) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index 9a207b4..d84c5fa 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -56,7 +56,7 @@ public class ThriftClient implements AutoCloseable {
             port = type.getPort(storm_conf);
         }
 
-        if (port<=0) {
+        if (port<=0 && !type.isFake()) {
             throw new IllegalArgumentException("invalid port: "+port);
         }          
 
@@ -66,7 +66,9 @@ public class ThriftClient implements AutoCloseable {
         _conf = storm_conf;
         _type = type;
         _asUser = asUser;
-        reconnect();
+        if (!type.isFake()) {
+            reconnect();
+        }
     }
 
     public synchronized TTransport transport() {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
index 0108d73..19d8dd6 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
@@ -31,7 +31,8 @@ public enum ThriftConnectionType {
     DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
          Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
     DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null,
-         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null);
+         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
+    LOCAL_FAKE;
 
     private final String _transConf;
     private final String _portConf;
@@ -39,17 +40,32 @@ public enum ThriftConnectionType {
     private final String _threadsConf;
     private final String _buffConf;
     private final String _socketTimeoutConf;
+    private final boolean _isFake;
 
+    ThriftConnectionType() {
+        this(null, null, null, null, null, null, true);
+    }
+    
+    ThriftConnectionType(String transConf, String portConf, String qConf,
+            String threadsConf, String buffConf, String socketTimeoutConf) {
+        this(transConf, portConf, qConf, threadsConf, buffConf, socketTimeoutConf, false);
+    }
+    
     ThriftConnectionType(String transConf, String portConf, String qConf,
-                         String threadsConf, String buffConf, String socketTimeoutConf) {
+                         String threadsConf, String buffConf, String socketTimeoutConf, boolean isFake) {
         _transConf = transConf;
         _portConf = portConf;
         _qConf = qConf;
         _threadsConf = threadsConf;
         _buffConf = buffConf;
         _socketTimeoutConf = socketTimeoutConf;
+        _isFake = isFake;
     }
 
+    public boolean isFake() {
+        return _isFake;
+    }
+    
     public String getTransportPlugin(Map conf) {
         String ret = (String)conf.get(_transConf);
         if (ret == null) {
@@ -59,6 +75,9 @@ public enum ThriftConnectionType {
     }
 
     public int getPort(Map conf) {
+        if (_isFake) {
+            return -1;
+        }
         return ObjectReader.getInt(conf.get(_portConf));
     }
 
@@ -70,10 +89,16 @@ public enum ThriftConnectionType {
     }
 
     public int getNumThreads(Map conf) {
+        if (_isFake) {
+            return 1;
+        }
         return ObjectReader.getInt(conf.get(_threadsConf));
     }
 
     public int getMaxBufferSize(Map conf) {
+        if (_isFake) {
+            return 1;
+        }
         return ObjectReader.getInt(conf.get(_buffConf));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java b/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
new file mode 100644
index 0000000..3641e07
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.utils.Utils;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Extensions of this class takes a
+ * reference to one or more configuration files. The main() method should call
+ * ConfigurableTopology.start() and it must instantiate a TopologyBuilder in the
+ * run() method.
+ * 
+ * <pre>
+ * {
+ *    public class MyTopology extends ConfigurableTopology {
+ *
+  *   public static void main(String[] args) throws Exception {
+  *       ConfigurableTopology.start(new MyTopology(), args);
+  *   }
+  *
+  *   &#64;Override
+  *   protected int run(String[] args) {
+  *       TopologyBuilder builder = new TopologyBuilder();
+  *
+  *       // build topology as usual
+  *    
+  *       return submit("crawl", conf, builder);
+  *   }
+ * }
+ * </pre>
+ **/
+public abstract class ConfigurableTopology {
+
+    protected Config conf = new Config();
+
+    public static void start(ConfigurableTopology topology, String args[]) {
+        String[] remainingArgs = topology.parse(args);
+        try {
+            topology.run(remainingArgs);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected Config getConf() {
+        return conf;
+    }
+
+    protected abstract int run(String args[]) throws Exception;
+
+    /** Submits the topology with the name taken from the configuration **/
+    protected int submit(Config conf, TopologyBuilder builder) {
+        String name = (String) Utils.get(conf, Config.TOPOLOGY_NAME, null);
+        if (StringUtils.isBlank(name))
+            throw new RuntimeException(
+                    "No value found for " + Config.TOPOLOGY_NAME);
+        return submit(name, conf, builder);
+    }
+
+    /** Submits the topology under a specific name **/
+    protected int submit(String name, Config conf, TopologyBuilder builder) {
+        try {
+            StormSubmitter.submitTopology(name, conf,
+                    builder.createTopology());
+        } catch (Exception e) {
+            e.printStackTrace();
+            return -1;
+        }
+        return 0;
+    }
+
+    private String[] parse(String args[]) {
+
+        List<String> newArgs = new ArrayList<>();
+        Collections.addAll(newArgs, args);
+
+        Iterator<String> iter = newArgs.iterator();
+        while (iter.hasNext()) {
+            String param = iter.next();
+            if (param.equals("-conf")) {
+                if (!iter.hasNext()) {
+                    throw new RuntimeException("Conf file not specified");
+                }
+                iter.remove();
+                String resource = iter.next();
+                try {
+                    loadConf(resource, conf);
+                } catch (FileNotFoundException e) {
+                    throw new RuntimeException("File not found : " + resource);
+                }
+                iter.remove();
+            }
+        }
+
+        return newArgs.toArray(new String[newArgs.size()]);
+    }
+
+    public static Config loadConf(String resource, Config conf)
+            throws FileNotFoundException {
+        Yaml yaml = new Yaml();
+        Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(
+                new FileInputStream(resource), Charset.defaultCharset()));
+        if (ret == null) {
+            ret = new HashMap<>();
+        }
+        // If the config consists of a single key 'config', its values are used
+        // instead. This means that the same config files can be used with Flux
+        // and the ConfigurableTopology.
+        else {
+            if (ret.size() == 1) {
+                Object confNode = ret.get("config");
+                if (confNode != null && confNode instanceof Map) {
+                    ret = (Map<String, Object>) ret;
+                }
+            }
+        }
+        conf.putAll(ret);
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
index 7f83789..1338231 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
@@ -17,34 +17,90 @@
  */
 package org.apache.storm.utils;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.ILocalDRPC;
+import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.generated.DistributedRPC;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
 import org.apache.storm.security.auth.ThriftClient;
 import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
-import java.util.Map;
-
 public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
-    private TTransport conn;
-    private DistributedRPC.Client client;
+    private static volatile ILocalDRPC _localOverrideClient = null;
+    
+    public static class LocalOverride implements AutoCloseable {
+        public LocalOverride(ILocalDRPC client) {
+            _localOverrideClient = client;
+        }
+        
+        @Override
+        public void close() throws Exception {
+            _localOverrideClient = null;
+        }
+    }
+    
+    /**
+     * @return true of new clients will be overridden to connect to a local cluster
+     * and not the configured remote cluster.
+     */
+    public static boolean isLocalOverride() {
+        return _localOverrideClient != null;
+    }
+    
+    /**
+     * @return the service ID of the local override DRPC instance
+     */
+    public static String getOverrideServiceId() {
+        return _localOverrideClient.getServiceId();
+    }
+
+    public static DRPCClient getConfiguredClient(Map conf) throws TTransportException {
+        DistributedRPC.Iface override = _localOverrideClient;
+        if (override != null) {
+            return new DRPCClient(override);
+        }
+
+        List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
+        Collections.shuffle(servers);
+        String host = servers.get(0);
+        int port = Integer.parseInt(conf.get(Config.DRPC_PORT).toString());
+        return new DRPCClient(conf, host, port);
+    }
+    
+    private DistributedRPC.Iface client;
     private String host;
     private int port;
-    private Integer timeout;
 
+    private DRPCClient(DistributedRPC.Iface override) {
+        super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE,
+                "localhost", 1234, null, null);
+        this.host = "localhost";
+        this.port = 1234;
+        this.client = override;
+    }
+    
     public DRPCClient(Map conf, String host, int port) throws TTransportException {
         this(conf, host, port, null);
         _retryForever = true;
     }
 
     public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
-        super(conf, ThriftConnectionType.DRPC, host, port, timeout, null);
+        super(conf, _localOverrideClient != null ? ThriftConnectionType.LOCAL_FAKE : ThriftConnectionType.DRPC,
+                host, port, timeout, null);
         this.host = host;
         this.port = port;
-        this.client = new DistributedRPC.Client(_protocol);
+        if (_localOverrideClient != null) {
+            this.client = _localOverrideClient;
+        } else {
+            this.client = new DistributedRPC.Client(_protocol);
+        }
         _retryForever = true;
     }
         
@@ -60,7 +116,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
         return client.execute(func, args);
     }
 
-    public DistributedRPC.Client getClient() {
+    public DistributedRPC.Iface getClient() {
         return client;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
index 6699ba8..cb2e1af 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -29,15 +29,41 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.security.Principal;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class NimbusClient extends ThriftClient {
-    private Nimbus.Client _client;
+    private static volatile Nimbus.Iface _localOverrideClient = null;
+
+    public static final class LocalOverride implements AutoCloseable {
+        public LocalOverride(Nimbus.Iface client) {
+            _localOverrideClient = client;
+        }
+        
+        @Override
+        public void close() throws Exception {
+            _localOverrideClient = null;
+        }
+    }
+    
+    /**
+     * @return true of new clients will be overridden to connect to a local cluster
+     * and not the configured remote cluster.
+     */
+    public static boolean isLocalOverride() {
+        return _localOverrideClient != null;
+    }
+    
+    private Nimbus.Iface _client;
+    /**
+     * Indicates if this is a special client that is overwritten for local mode.
+     */
+    public final boolean _isLocal;
     private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
 
     public interface WithNimbus {
-        public void run(Nimbus.Client client) throws Exception;
+        public void run(Nimbus.Iface client) throws Exception;
     }
 
     public static void withConfiguredClient(WithNimbus cb) throws Exception {
@@ -58,6 +84,10 @@ public class NimbusClient extends ThriftClient {
     }
 
     public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
+        Nimbus.Iface override = _localOverrideClient;
+        if (override != null) {
+            return new NimbusClient(override);
+        }
         if (conf.containsKey(Config.STORM_DO_AS_USER)) {
             if (asUser != null && !asUser.isEmpty()) {
                 LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
@@ -121,19 +151,28 @@ public class NimbusClient extends ThriftClient {
     public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
         super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null);
         _client = new Nimbus.Client(_protocol);
+        _isLocal = false;
     }
 
     public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException {
         super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser);
         _client = new Nimbus.Client(_protocol);
+        _isLocal = false;
     }
 
     public NimbusClient(Map conf, String host) throws TTransportException {
         super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
         _client = new Nimbus.Client(_protocol);
+        _isLocal = false;
+    }
+    
+    private NimbusClient(Nimbus.Iface client) {
+        super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, "localhost", null, null, null);
+        _client = client;
+        _isLocal = true;
     }
 
-    public Nimbus.Client getClient() {
+    public Nimbus.Iface getClient() {
         return _client;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 57101dc..771dc70 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -982,7 +982,7 @@ public class Utils {
         }
     }
 
-    public static String getTopologyId(String name, Nimbus.Client client) {
+    public static String getTopologyId(String name, Nimbus.Iface client) {
         try {
             ClusterSummary summary = client.getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure-test/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure-test/pom.xml b/storm-clojure-test/pom.xml
new file mode 100644
index 0000000..743b72b
--- /dev/null
+++ b/storm-clojure-test/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>storm-clojure-test</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-clojure</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build> 
+        <plugins>
+            <plugin>
+                <groupId>com.theoryinpractise</groupId>
+                <artifactId>clojure-maven-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <sourceDirectories>
+                        <sourceDirectory>src/clj</sourceDirectory>
+                    </sourceDirectories>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <phase>compile</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure-test/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-clojure-test/src/clj/org/apache/storm/testing.clj b/storm-clojure-test/src/clj/org/apache/storm/testing.clj
new file mode 100644
index 0000000..1e374ed
--- /dev/null
+++ b/storm-clojure-test/src/clj/org/apache/storm/testing.clj
@@ -0,0 +1,270 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testing
+  (:import [org.apache.storm LocalCluster$Builder])
+  (:import [java.util.function UnaryOperator])
+  (:import [org.apache.storm.utils Time Time$SimulatedTime RegisteredGlobalState Utils])
+  (:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam 
+            TrackedTopology CompleteTopologyParam MockedSources])
+  (:import [org.apache.storm Thrift Testing Testing$Condition])
+  (:import [org.apache.storm.testing MockLeaderElector])
+  (:import [org.json.simple JSONValue])
+  (:use [org.apache.storm util config log])
+  (:use [org.apache.storm thrift]))
+
+(defnk add-supervisor
+  [cluster-map :ports 2 :conf {} :id nil]
+  (let [local-cluster (:local-cluster cluster-map)]
+    (.addSupervisor local-cluster ports conf id)))
+
+(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
+  (MockLeaderElector. is-leader leader-name leader-port))
+
+(defn local-cluster-state [local-cluster]
+    {:nimbus (.getNimbus local-cluster)
+     :daemon-conf (.getDaemonConf local-cluster)
+     :storm-cluster-state (.getClusterState local-cluster)
+     :local-cluster local-cluster})
+
+(defnk mk-mocked-nimbus 
+  [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil 
+   :leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
+  (let [builder (doto (LocalCluster$Builder.)
+                  (.withDaemonConf daemon-conf)
+                  (.withINimbus inimbus)
+                  (.withBlobStore blob-store)
+                  (.withClusterState cluster-state)
+                  (.withLeaderElector leader-elector)
+                  (.withGroupMapper group-mapper)
+                  (.withNimbusDaemon nimbus-daemon)
+                  (.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
+        local-cluster (.build builder)]
+    (local-cluster-state local-cluster)))
+
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
+  (let [builder (doto (LocalCluster$Builder.)
+                  (.withSupervisors supervisors)
+                  (.withPortsPerSupervisor ports-per-supervisor)
+                  (.withDaemonConf daemon-conf)
+                  (.withINimbus inimbus)
+                  (.withGroupMapper group-mapper)
+                  (.withSupervisorSlotPortMin supervisor-slot-port-min)
+                  (.withNimbusDaemon nimbus-daemon))
+        local-cluster (.build builder)]
+    (local-cluster-state local-cluster)))
+
+(defn get-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+  (.getSupervisor local-cluster supervisor-id))) 
+
+(defn kill-supervisor [cluster-map supervisor-id]
+ (let [local-cluster (:local-cluster cluster-map)]
+  (.killSupervisor local-cluster supervisor-id))) 
+
+(defn kill-local-storm-cluster [cluster-map]
+ (let [local-cluster (:local-cluster cluster-map)]
+  (.close local-cluster))) 
+
+(defmacro while-timeout [timeout-ms condition & body]
+  `(Testing/whileTimeout ~timeout-ms
+     (reify Testing$Condition (exec [this] ~condition))
+     (fn [] ~@body)))
+
+(defn wait-for-condition
+  ([apredicate]
+    (wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
+  ([timeout-ms apredicate]
+    (while-timeout timeout-ms (not (apredicate))
+      (Time/sleep 100))))
+
+(defn wait-until-cluster-waiting
+  "Wait until the cluster is idle. Should be used with time simulation."
+  ([cluster-map]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.waitForIdle local-cluster)))
+  ([cluster-map timeout-ms]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.waitForIdle local-cluster timeout-ms))))
+
+(defn advance-cluster-time
+  ([cluster-map secs increment-secs]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.advanceClusterTime local-cluster secs increment-secs))) 
+  ([cluster-map secs]
+    (let [local-cluster (:local-cluster cluster-map)]
+      (.advanceClusterTime local-cluster secs))))
+
+(defmacro with-mocked-nimbus
+  [[nimbus-sym & args] & body]
+  `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
+     (try
+       ~@body
+       (catch Throwable t#
+         (log-error t# "Error in cluster")
+         (throw t#))
+       (finally
+         (let [keep-waiting?# (atom true)
+               f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
+           (kill-local-storm-cluster ~nimbus-sym)
+           (reset! keep-waiting?# false)
+            @f#)))))
+
+(defmacro with-local-cluster
+  [[cluster-sym & args] & body]
+  `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
+     (try
+       ~@body
+       (catch Throwable t#
+         (log-error t# "Error in cluster")
+         (throw t#))
+       (finally
+         (let [keep-waiting?# (atom true)
+               f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
+           (kill-local-storm-cluster ~cluster-sym)
+           (reset! keep-waiting?# false)
+            @f#)))))
+
+(defmacro with-simulated-time-local-cluster
+  [& args]
+  `(with-open [_# (Time$SimulatedTime.)]
+     (with-local-cluster ~@args)))
+
+(defmacro with-inprocess-zookeeper
+  [port-sym & body]
+  `(with-open [zks# (InProcessZookeeper. )]
+     (let [~port-sym (.getPort zks#)]
+       ~@body)))
+
+(defn submit-local-topology
+  [nimbus storm-name conf topology]
+  (when-not (Utils/isValidConf conf)
+    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+  (.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
+
+(defn submit-local-topology-with-opts
+  [nimbus storm-name conf topology submit-opts]
+  (when-not (Utils/isValidConf conf)
+    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
+  (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
+
+(defn simulate-wait
+  [cluster-map]
+  (Testing/simulateWait (:local-cluster cluster-map)))
+
+(defn spout-objects [spec-map]
+  (for [[_ spout-spec] spec-map]
+    (-> spout-spec
+        .get_spout_object
+        (Thrift/deserializeComponentObject))))
+
+(defn capture-topology
+  [topology]
+  (let [cap-topo (Testing/captureTopology topology)]
+    {:topology (.topology cap-topo)
+     :capturer (.capturer cap-topo)}))
+
+(defnk complete-topology
+  [cluster-map topology
+   :mock-sources {}
+   :storm-conf {}
+   :cleanup-state true
+   :topology-name nil
+   :timeout-ms Testing/TEST_TIMEOUT_MS]
+  (Testing/completeTopology (:local-cluster cluster-map) topology, 
+      (doto (CompleteTopologyParam.)
+            (.setStormConf storm-conf)
+            (.setTopologyName topology-name)
+            (.setTimeoutMs timeout-ms)
+            (.setMockedSources (MockedSources. mock-sources))
+            (.setCleanupState cleanup-state))))
+
+(defn read-tuples
+  ([results component-id stream-id]
+   (Testing/readTuples results component-id stream-id))
+  ([results component-id]
+   (Testing/readTuples results component-id )))
+
+(defn ms=
+  [a b]
+  (Testing/multiseteq a b))
+
+(def TRACKER-BOLT-ID "+++tracker-bolt")
+
+;; TODO: should override system-topology! and wrap everything there
+(defn mk-tracked-topology
+  ([tracked-cluster topology]
+   (let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
+     {:topology (.getTopology tt)
+      :tracked-topo tt})))
+
+(defn increment-global!
+  [id key amt]
+  (-> (RegisteredGlobalState/getState id)
+      (get key)
+      (.addAndGet amt)))
+
+(defn global-amt
+  [id key]
+  (-> (RegisteredGlobalState/getState id)
+      (get key)
+      .get))
+
+(defnk mkClusterParam 
+  [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
+  ;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
+  (doto (MkClusterParam. )
+    (.setDaemonConf daemon-conf)
+    (.setNimbusDaemon nimbus-daemon)
+    (.setPortsPerSupervisor (int ports-per-supervisor))
+    (.setSupervisors (int supervisors))))
+
+(defmacro with-tracked-cluster
+  [[cluster-sym & cluster-args] & body]
+  `(Testing/withTrackedCluster
+       (mkClusterParam ~@cluster-args)
+       (reify TestJob
+         (run [this# lc#]
+           (let [~cluster-sym (local-cluster-state lc#)]
+             ~@body)))))
+
+(defn tracked-wait
+  "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
+  ([tracked-map]
+     (Testing/trackedWait (:tracked-topo tracked-map)))
+  ([tracked-map amt]
+     (Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
+  ([tracked-map amt timeout-ms]
+     (Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
+
+(defnk test-tuple
+  [values
+   :stream Utils/DEFAULT_STREAM_ID
+   :component "component"
+   :fields nil]
+  (Testing/testTuple
+    values
+    (doto (MkTupleParam. )
+      (.setStream stream)
+      (.setComponent component)
+      (.setFieldsList fields))))
+
+(defmacro with-timeout
+  [millis unit & body]
+  `(let [f# (future ~@body)]
+     (try
+       (.get f# ~millis ~unit)
+       (finally (future-cancel f#)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml
index 1d50df0..cef8fd0 100644
--- a/storm-clojure/pom.xml
+++ b/storm-clojure/pom.xml
@@ -35,17 +35,8 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
+            <groupId>org.clojure</groupId>
+            <artifactId>tools.logging</artifactId>
         </dependency>
         <dependency>
             <groupId>com.googlecode.json-simple</groupId>
@@ -54,7 +45,7 @@
         </dependency>
     </dependencies>
 
-    <build>
+    <build> 
         <plugins>
             <plugin>
                 <groupId>com.theoryinpractise</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/config.clj b/storm-clojure/src/clj/org/apache/storm/config.clj
new file mode 100644
index 0000000..bfe47ed
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/config.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.config
+  (:import [org.apache.storm Config]))
+
+(defn- clojure-config-name [name]
+  (.replace (.toUpperCase name) "_" "-"))
+
+; define clojure constants for every configuration parameter
+(doseq [f (seq (.getFields Config))]
+  (let [name (.getName f)
+        new-name (clojure-config-name name)]
+    (eval
+      `(def ~(symbol new-name) (. Config ~(symbol name))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/log.clj b/storm-clojure/src/clj/org/apache/storm/log.clj
new file mode 100644
index 0000000..7a006ef
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/log.clj
@@ -0,0 +1,34 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.log
+  (:require [clojure.tools.logging :as log]))
+
+(defmacro log-message
+  [& args]
+  `(log/info (str ~@args)))
+
+(defmacro log-error
+  [e & args]
+  `(log/log :error ~e (str ~@args)))
+
+(defmacro log-debug
+  [& args]
+  `(log/debug (str ~@args)))
+
+(defmacro log-warn
+  [& args]
+  `(log/warn (str ~@args)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/testing.clj b/storm-clojure/src/clj/org/apache/storm/testing.clj
deleted file mode 100644
index 1e374ed..0000000
--- a/storm-clojure/src/clj/org/apache/storm/testing.clj
+++ /dev/null
@@ -1,270 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.testing
-  (:import [org.apache.storm LocalCluster$Builder])
-  (:import [java.util.function UnaryOperator])
-  (:import [org.apache.storm.utils Time Time$SimulatedTime RegisteredGlobalState Utils])
-  (:import [org.apache.storm.testing InProcessZookeeper MkTupleParam TestJob MkClusterParam 
-            TrackedTopology CompleteTopologyParam MockedSources])
-  (:import [org.apache.storm Thrift Testing Testing$Condition])
-  (:import [org.apache.storm.testing MockLeaderElector])
-  (:import [org.json.simple JSONValue])
-  (:use [org.apache.storm util config log])
-  (:use [org.apache.storm thrift]))
-
-(defnk add-supervisor
-  [cluster-map :ports 2 :conf {} :id nil]
-  (let [local-cluster (:local-cluster cluster-map)]
-    (.addSupervisor local-cluster ports conf id)))
-
-(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999]
-  (MockLeaderElector. is-leader leader-name leader-port))
-
-(defn local-cluster-state [local-cluster]
-    {:nimbus (.getNimbus local-cluster)
-     :daemon-conf (.getDaemonConf local-cluster)
-     :storm-cluster-state (.getClusterState local-cluster)
-     :local-cluster local-cluster})
-
-(defnk mk-mocked-nimbus 
-  [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil 
-   :leader-elector nil :group-mapper nil :nimbus-daemon false :nimbus-wrapper nil]
-  (let [builder (doto (LocalCluster$Builder.)
-                  (.withDaemonConf daemon-conf)
-                  (.withINimbus inimbus)
-                  (.withBlobStore blob-store)
-                  (.withClusterState cluster-state)
-                  (.withLeaderElector leader-elector)
-                  (.withGroupMapper group-mapper)
-                  (.withNimbusDaemon nimbus-daemon)
-                  (.withNimbusWrapper (when nimbus-wrapper (reify UnaryOperator (apply [this nimbus] (nimbus-wrapper nimbus))))))
-        local-cluster (.build builder)]
-    (local-cluster-state local-cluster)))
-
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
-  (let [builder (doto (LocalCluster$Builder.)
-                  (.withSupervisors supervisors)
-                  (.withPortsPerSupervisor ports-per-supervisor)
-                  (.withDaemonConf daemon-conf)
-                  (.withINimbus inimbus)
-                  (.withGroupMapper group-mapper)
-                  (.withSupervisorSlotPortMin supervisor-slot-port-min)
-                  (.withNimbusDaemon nimbus-daemon))
-        local-cluster (.build builder)]
-    (local-cluster-state local-cluster)))
-
-(defn get-supervisor [cluster-map supervisor-id]
- (let [local-cluster (:local-cluster cluster-map)]
-  (.getSupervisor local-cluster supervisor-id))) 
-
-(defn kill-supervisor [cluster-map supervisor-id]
- (let [local-cluster (:local-cluster cluster-map)]
-  (.killSupervisor local-cluster supervisor-id))) 
-
-(defn kill-local-storm-cluster [cluster-map]
- (let [local-cluster (:local-cluster cluster-map)]
-  (.close local-cluster))) 
-
-(defmacro while-timeout [timeout-ms condition & body]
-  `(Testing/whileTimeout ~timeout-ms
-     (reify Testing$Condition (exec [this] ~condition))
-     (fn [] ~@body)))
-
-(defn wait-for-condition
-  ([apredicate]
-    (wait-for-condition Testing/TEST_TIMEOUT_MS apredicate))
-  ([timeout-ms apredicate]
-    (while-timeout timeout-ms (not (apredicate))
-      (Time/sleep 100))))
-
-(defn wait-until-cluster-waiting
-  "Wait until the cluster is idle. Should be used with time simulation."
-  ([cluster-map]
-    (let [local-cluster (:local-cluster cluster-map)]
-      (.waitForIdle local-cluster)))
-  ([cluster-map timeout-ms]
-    (let [local-cluster (:local-cluster cluster-map)]
-      (.waitForIdle local-cluster timeout-ms))))
-
-(defn advance-cluster-time
-  ([cluster-map secs increment-secs]
-    (let [local-cluster (:local-cluster cluster-map)]
-      (.advanceClusterTime local-cluster secs increment-secs))) 
-  ([cluster-map secs]
-    (let [local-cluster (:local-cluster cluster-map)]
-      (.advanceClusterTime local-cluster secs))))
-
-(defmacro with-mocked-nimbus
-  [[nimbus-sym & args] & body]
-  `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
-     (try
-       ~@body
-       (catch Throwable t#
-         (log-error t# "Error in cluster")
-         (throw t#))
-       (finally
-         (let [keep-waiting?# (atom true)
-               f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
-           (kill-local-storm-cluster ~nimbus-sym)
-           (reset! keep-waiting?# false)
-            @f#)))))
-
-(defmacro with-local-cluster
-  [[cluster-sym & args] & body]
-  `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
-     (try
-       ~@body
-       (catch Throwable t#
-         (log-error t# "Error in cluster")
-         (throw t#))
-       (finally
-         (let [keep-waiting?# (atom true)
-               f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
-           (kill-local-storm-cluster ~cluster-sym)
-           (reset! keep-waiting?# false)
-            @f#)))))
-
-(defmacro with-simulated-time-local-cluster
-  [& args]
-  `(with-open [_# (Time$SimulatedTime.)]
-     (with-local-cluster ~@args)))
-
-(defmacro with-inprocess-zookeeper
-  [port-sym & body]
-  `(with-open [zks# (InProcessZookeeper. )]
-     (let [~port-sym (.getPort zks#)]
-       ~@body)))
-
-(defn submit-local-topology
-  [nimbus storm-name conf topology]
-  (when-not (Utils/isValidConf conf)
-    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopology nimbus storm-name nil (JSONValue/toJSONString conf) topology))
-
-(defn submit-local-topology-with-opts
-  [nimbus storm-name conf topology submit-opts]
-  (when-not (Utils/isValidConf conf)
-    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
-  (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts))
-
-(defn simulate-wait
-  [cluster-map]
-  (Testing/simulateWait (:local-cluster cluster-map)))
-
-(defn spout-objects [spec-map]
-  (for [[_ spout-spec] spec-map]
-    (-> spout-spec
-        .get_spout_object
-        (Thrift/deserializeComponentObject))))
-
-(defn capture-topology
-  [topology]
-  (let [cap-topo (Testing/captureTopology topology)]
-    {:topology (.topology cap-topo)
-     :capturer (.capturer cap-topo)}))
-
-(defnk complete-topology
-  [cluster-map topology
-   :mock-sources {}
-   :storm-conf {}
-   :cleanup-state true
-   :topology-name nil
-   :timeout-ms Testing/TEST_TIMEOUT_MS]
-  (Testing/completeTopology (:local-cluster cluster-map) topology, 
-      (doto (CompleteTopologyParam.)
-            (.setStormConf storm-conf)
-            (.setTopologyName topology-name)
-            (.setTimeoutMs timeout-ms)
-            (.setMockedSources (MockedSources. mock-sources))
-            (.setCleanupState cleanup-state))))
-
-(defn read-tuples
-  ([results component-id stream-id]
-   (Testing/readTuples results component-id stream-id))
-  ([results component-id]
-   (Testing/readTuples results component-id )))
-
-(defn ms=
-  [a b]
-  (Testing/multiseteq a b))
-
-(def TRACKER-BOLT-ID "+++tracker-bolt")
-
-;; TODO: should override system-topology! and wrap everything there
-(defn mk-tracked-topology
-  ([tracked-cluster topology]
-   (let [tt (TrackedTopology. topology (:local-cluster tracked-cluster))]
-     {:topology (.getTopology tt)
-      :tracked-topo tt})))
-
-(defn increment-global!
-  [id key amt]
-  (-> (RegisteredGlobalState/getState id)
-      (get key)
-      (.addAndGet amt)))
-
-(defn global-amt
-  [id key]
-  (-> (RegisteredGlobalState/getState id)
-      (get key)
-      .get))
-
-(defnk mkClusterParam 
-  [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :nimbus-daemon false]
-  ;;TODO do we need to support inimbus?, group-mapper, or supervisor-slot-port-min
-  (doto (MkClusterParam. )
-    (.setDaemonConf daemon-conf)
-    (.setNimbusDaemon nimbus-daemon)
-    (.setPortsPerSupervisor (int ports-per-supervisor))
-    (.setSupervisors (int supervisors))))
-
-(defmacro with-tracked-cluster
-  [[cluster-sym & cluster-args] & body]
-  `(Testing/withTrackedCluster
-       (mkClusterParam ~@cluster-args)
-       (reify TestJob
-         (run [this# lc#]
-           (let [~cluster-sym (local-cluster-state lc#)]
-             ~@body)))))
-
-(defn tracked-wait
-  "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
-  ([tracked-map]
-     (Testing/trackedWait (:tracked-topo tracked-map)))
-  ([tracked-map amt]
-     (Testing/trackedWait (:tracked-topo tracked-map) (int amt)))
-  ([tracked-map amt timeout-ms]
-     (Testing/trackedWait (:tracked-topo tracked-map) (int amt) (int timeout-ms))))
-
-(defnk test-tuple
-  [values
-   :stream Utils/DEFAULT_STREAM_ID
-   :component "component"
-   :fields nil]
-  (Testing/testTuple
-    values
-    (doto (MkTupleParam. )
-      (.setStream stream)
-      (.setComponent component)
-      (.setFieldsList fields))))
-
-(defmacro with-timeout
-  [millis unit & body]
-  `(let [f# (future ~@body)]
-     (try
-       (.get f# ~millis ~unit)
-       (finally (future-cancel f#)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-clojure/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/util.clj b/storm-clojure/src/clj/org/apache/storm/util.clj
new file mode 100644
index 0000000..9ad1f10
--- /dev/null
+++ b/storm-clojure/src/clj/org/apache/storm/util.clj
@@ -0,0 +1,134 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.util
+  (:import [java.util Map List HashMap])
+  (:import [org.apache.storm.generated ErrorInfo])
+  (:import [org.apache.storm.utils Utils])
+  (:import [java.util List Set])
+  (:use [clojure walk])
+  (:use [org.apache.storm log]))
+
+;; name-with-attributes by Konrad Hinsen:
+(defn name-with-attributes
+  "To be used in macro definitions.
+  Handles optional docstrings and attribute maps for a name to be defined
+  in a list of macro arguments. If the first macro argument is a string,
+  it is added as a docstring to name and removed from the macro argument
+  list. If afterwards the first macro argument is a map, its entries are
+  added to the name's metadata map and the map is removed from the
+  macro argument list. The return value is a vector containing the name
+  with its extended metadata map and the list of unprocessed macro
+  arguments."
+  [name macro-args]
+  (let [[docstring macro-args] (if (string? (first macro-args))
+                                 [(first macro-args) (next macro-args)]
+                                 [nil macro-args])
+        [attr macro-args] (if (map? (first macro-args))
+                            [(first macro-args) (next macro-args)]
+                            [{} macro-args])
+        attr (if docstring
+               (assoc attr :doc docstring)
+               attr)
+        attr (if (meta name)
+               (conj (meta name) attr)
+               attr)]
+    [(with-meta name attr) macro-args]))
+
+(defmacro defnk
+  "Define a function accepting keyword arguments. Symbols up to the first
+  keyword in the parameter list are taken as positional arguments.  Then
+  an alternating sequence of keywords and defaults values is expected. The
+  values of the keyword arguments are available in the function body by
+  virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+  defnk accepts an optional docstring as well as an optional metadata map."
+  [fn-name & fn-tail]
+  (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+        [pos kw-vals] (split-with symbol? args)
+        syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+        values (take-nth 2 (rest kw-vals))
+        sym-vals (apply hash-map (interleave syms values))
+        de-map {:keys (vec syms) :or sym-vals}]
+    `(defn ~fn-name
+       [~@pos & options#]
+       (let [~de-map (apply hash-map options#)]
+         ~@body))))
+
+(defmacro thrown-cause?
+  [klass & body]
+  `(try
+     ~@body
+     false
+     (catch Throwable t#
+       (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)]
+         (if (not tc#) (log-error t# "Exception did not match " ~klass))
+         tc#))))
+
+(defn clojurify-structure
+  [s]
+  (if s
+    (prewalk (fn [x]
+             (cond (instance? Map x) (into {} x)
+                   (instance? List x) (vec x)
+                   (instance? Set x) (into #{} x)
+                   ;; (Boolean. false) does not evaluate to false in an if.
+                   ;; This fixes that.
+                   (instance? Boolean x) (boolean x)
+                   true x))
+           s)))
+; move this func form convert.clj due to cyclic load dependency
+(defn clojurify-error [^ErrorInfo error]
+  (if error
+    {
+      :error (.get_error error)
+      :time-secs (.get_error_time_secs error)
+      :host (.get_host error)
+      :port (.get_port error)
+      }
+    ))
+
+;TODO: We're keeping this function around until all the code using it is properly tranlated to java
+;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
+(defn map-val
+  [afn amap]
+  (into {}
+        (for [[k v] amap]
+          [k (afn v)])))
+
+;TODO: We're keeping this function around until all the code using it is properly tranlated to java
+;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
+(defn filter-key
+  [afn amap]
+  (into {} (filter (fn [[k v]] (afn k)) amap)))
+
+;TODO: Once all the other clojure functions (100+ locations) are translated to java, this function becomes moot.
+(def not-nil? (complement nil?))
+
+(defmacro dofor [& body]
+  `(doall (for ~@body)))
+
+(defmacro -<>
+  ([x] x)
+  ([x form] (if (seq? form)
+              (with-meta
+                (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+                  (concat begin [x] end))
+                (meta form))
+              (list form x)))
+  ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
+
+(defn hashmap-to-persistent [^HashMap m]
+  (zipmap (.keySet m) (.values m)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b5f7086..0014aa5 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -377,7 +377,7 @@
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>
-         <testSourceDirectory>test/jvm</testSourceDirectory>
+        <testSourceDirectory>test/jvm</testSourceDirectory>
         <resources>
             <resource>
                 <directory>../</directory>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/src/jvm/org/apache/storm/command/Activate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Activate.java b/storm-core/src/jvm/org/apache/storm/command/Activate.java
index 6a64bf6..45d4a99 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Activate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Activate.java
@@ -31,7 +31,7 @@ public class Activate {
 
         NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
           @Override
-          public void run(Nimbus.Client nimbus) throws Exception {
+          public void run(Nimbus.Iface nimbus) throws Exception {
             nimbus.activate(name);
             LOG.info("Activated topology: {}", name);
           }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
index 6b9dd11..4eafb83 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
@@ -31,7 +31,7 @@ public class Deactivate {
 
         NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
           @Override
-          public void run(Nimbus.Client nimbus) throws Exception {
+          public void run(Nimbus.Iface nimbus) throws Exception {
             nimbus.deactivate(name);
             LOG.info("Deactivated topology: {}", name);
           }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
index ee15446..05c0ee4 100644
--- a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
+++ b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
@@ -40,7 +40,7 @@ public class GetErrors {
 
         NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
             @Override
-            public void run(Nimbus.Client client) throws Exception {
+            public void run(Nimbus.Iface client) throws Exception {
                 GetInfoOptions opts = new GetInfoOptions();
                 opts.set_num_err_choice(NumErrorsChoice.ONE);
                 String topologyId = Utils.getTopologyId(name, client);