You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:12 UTC

[04/10] storm git commit: STORM-2447: add in storm local to avoid having server on worker classpath

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index f5278fd..c162ca6 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -18,22 +18,20 @@
 package org.apache.storm.redis.trident;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateQuerier;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class WordCountTridentRedis {
     public static StormTopology buildTopology(String redisHost, Integer redisPort){
@@ -71,28 +69,17 @@ public class WordCountTridentRedis {
     }
 
     public static void main(String[] args) throws Exception {
-        if (args.length != 3) {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        if (args.length != 2) {
+            System.out.println("Usage: WordCountTrident redis-host redis-port");
             System.exit(1);
         }
 
-        Integer flag = Integer.valueOf(args[0]);
-        String redisHost = args[1];
-        Integer redisPort = Integer.valueOf(args[2]);
+        String redisHost = args[0];
+        Integer redisPort = Integer.valueOf(args[1]);
 
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (flag == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));) {
-                Thread.sleep(60 * 1000);
-            }
-            System.exit(0);
-        } else if(flag == 1) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
-        } else {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
-        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index b6e067d..687ac54 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -17,27 +17,25 @@
  */
 package org.apache.storm.redis.trident;
 
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.trident.state.RedisClusterState;
 import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
 import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.testing.FixedBatchSpout;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class WordCountTridentRedisCluster {
     public static StormTopology buildTopology(String redisHostPort){
@@ -79,28 +77,17 @@ public class WordCountTridentRedisCluster {
     }
 
     public static void main(String[] args) throws Exception {
-        if (args.length != 2) {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+        if (args.length != 1) {
+            System.out.println("Usage: WordCountTrident 127.0.0.1:6379,127.0.0.1:6380");
             System.exit(1);
         }
 
-        Integer flag = Integer.valueOf(args[0]);
-        String redisHostPort = args[1];
+        String redisHostPort = args[0];
 
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (flag == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));) {
-                Thread.sleep(60 * 1000);
-            }
-            System.exit(0);
-        } else if(flag == 1) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
-        } else {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
-        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index 0a025bd..8cf0f3c 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -17,17 +17,16 @@
  */
 package org.apache.storm.redis.trident;
 
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterMapState;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
@@ -35,10 +34,8 @@ import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.testing.FixedBatchSpout;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class WordCountTridentRedisClusterMap {
     public static StormTopology buildTopology(String redisHostPort){
@@ -74,28 +71,17 @@ public class WordCountTridentRedisClusterMap {
     }
 
     public static void main(String[] args) throws Exception {
-        if (args.length != 2) {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+        if (args.length != 1) {
+            System.out.println("Usage: WordCountTrident 127.0.0.1:6379,127.0.0.1:6380");
             System.exit(1);
         }
 
-        Integer flag = Integer.valueOf(args[0]);
-        String redisHostPort = args[1];
+        String redisHostPort = args[0];
 
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (flag == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));) {
-                Thread.sleep(60 * 1000);
-            }
-            System.exit(0);
-        } else if(flag == 1) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
-        } else {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
-        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index b0cddcd..dac0a4d 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -18,15 +18,11 @@
 package org.apache.storm.redis.trident;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.trident.state.RedisMapState;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
@@ -34,6 +30,8 @@ import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class WordCountTridentRedisMap {
     public static StormTopology buildTopology(String redisHost, Integer redisPort){
@@ -66,29 +64,18 @@ public class WordCountTridentRedisMap {
     }
 
     public static void main(String[] args) throws Exception {
-        if (args.length != 3) {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        if (args.length != 2) {
+            System.out.println("Usage: WordCountTrident redis-host redis-port");
             System.exit(1);
         }
 
-        Integer flag = Integer.valueOf(args[0]);
-        String redisHost = args[1];
-        Integer redisPort = Integer.valueOf(args[2]);
+        String redisHost = args[0];
+        Integer redisPort = Integer.valueOf(args[1]);
 
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (flag == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));) {
-                Thread.sleep(60 * 1000);
-            }
-            System.exit(0);
-        } else if(flag == 1) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
-        } else {
-            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
-        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-solr-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-solr-examples/pom.xml b/examples/storm-solr-examples/pom.xml
index d6d86fc..4ec049b 100644
--- a/examples/storm-solr-examples/pom.xml
+++ b/examples/storm-solr-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
index 3b5b1b6..6e483d7 100644
--- a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
+++ b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
@@ -18,18 +18,16 @@
 
 package org.apache.storm.solr.topology;
 
+import java.io.IOException;
+
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.solr.config.SolrCommitStrategy;
 import org.apache.storm.solr.config.SolrConfig;
 
-import java.io.IOException;
-
 public abstract class SolrTopology {
     protected static String COLLECTION = "gettingstarted";
 
@@ -37,11 +35,11 @@ public abstract class SolrTopology {
         final StormTopology topology = getTopology();
         final Config config = getConfig();
 
-        if (args.length == 0) {
-            submitTopologyLocalCluster(topology, config);
-        } else {
-            submitTopologyRemoteCluster(args[0], topology, config);
+        String topoName = "test";
+        if (args.length > 0) {
+            topoName = args[0];
         }
+        submitTopologyRemoteCluster(topoName, topology, config);
     }
 
     protected abstract StormTopology getTopology() throws IOException;
@@ -50,15 +48,6 @@ public abstract class SolrTopology {
         StormSubmitter.submitTopology(arg, config, topology);
     }
 
-    protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception {
-        try (LocalCluster cluster = new LocalCluster();
-             LocalTopology topo = cluster.submitTopology("test", config, topology);) {
-            Thread.sleep(10000);
-            System.out.println("Killing topology per client's request");
-        }
-        System.exit(0);
-    }
-
     protected Config getConfig() {
         Config config = new Config();
         config.setDebug(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 7d102e7..b66e4fe 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -71,32 +71,27 @@
         <groupId>org.apache.storm</groupId>
         <artifactId>storm-clojure</artifactId>
         <version>${project.version}</version>
-        <!--
-          Use "provided" scope to keep storm out of the jar-with-dependencies
-          For IntelliJ dev, intellij will load properly.
-        -->
-        <scope>${provided.scope}</scope>
     </dependency>
-      <!--
-      normally including 'storm-server' is OK for LocalCluster,
-      but 'storm-starter' also uses clojure implementation
-      so 'storm-starter' needs to include 'storm-core'
-      -->
+    <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-clojure-test</artifactId>
+        <version>${project.version}</version>
+        <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.storm</groupId>
-      <artifactId>storm-core</artifactId>
+      <artifactId>storm-client</artifactId>
       <version>${project.version}</version>
-      <!--
-        Use "provided" scope to keep storm out of the jar-with-dependencies
-        For IntelliJ dev, intellij will load properly.
-      -->
+        <!--
+          Use "provided" scope to keep storm out of the jar-with-dependencies
+          For IntelliJ dev, intellij will load properly.
+        -->
         <scope>${provided.scope}</scope>
     </dependency>
       <dependency>
           <groupId>org.apache.storm</groupId>
           <artifactId>storm-client-misc</artifactId>
           <version>${project.version}</version>
-          <scope>${provided.scope}</scope>
       </dependency>
       <dependency>
           <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
index 6bcd35c..415f43f 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
@@ -14,7 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.starter.clj.exclamation
-  (:import [org.apache.storm StormSubmitter LocalCluster]
+  (:import [org.apache.storm StormSubmitter]
     [org.apache.storm.utils Utils]
            [org.apache.storm.testing TestWordSpout])
   (:use [org.apache.storm clojure config])
@@ -31,11 +31,6 @@
    {"exclaim1" (bolt-spec {"word" :shuffle} exclamation-bolt :p 3)
     "exclaim2" (bolt-spec {"exclaim1" :shuffle} exclamation-bolt :p 2)}))
 
-(defn run-local! []
-      (with-open [cluster (LocalCluster.)
-              topo (.submitTopology cluster "exclamation" {TOPOLOGY-DEBUG true} (mk-topology))]
-                 (Utils/sleep 10000)))
-
 (defn submit-topology! [name]
   (StormSubmitter/submitTopologyWithProgressBar
    name
@@ -45,6 +40,6 @@
 
 (defn -main
   ([]
-   (run-local!))
+   (submit-topology! "test"))
   ([name]
    (submit-topology! name)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
index d5eada5..9a9677c 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
@@ -17,7 +17,7 @@
   (:require [org.apache.storm [clojure :refer :all] [config :refer :all]]
             [org.apache.storm.starter.clj.bolts :refer
              [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]])
-  (:import [org.apache.storm StormSubmitter LocalCluster]
+  (:import [org.apache.storm StormSubmitter]
     [org.apache.storm.utils Utils]
     [org.apache.storm.testing TestWordSpout])
   (:gen-class))
@@ -38,11 +38,6 @@
       total-ranker-id (bolt-spec {ranker-id :global}
                                  (total-rankings-bolt 5 2))})))
 
-(defn run-local! []
-      (with-open [cluster (LocalCluster.)
-              topo (.submitTopology cluster "slidingWindowCounts" {TOPOLOGY-DEBUG true} (mk-topology))]
-                 (Utils/sleep 60000)))
-
 (defn submit-topology! [name]
   (StormSubmitter/submitTopology
    name
@@ -52,6 +47,6 @@
 
 (defn -main
   ([]
-   (run-local!))
+   (submit-topology! "test"))
   ([name]
    (submit-topology! name)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
index cefa1e5..e3a52f5 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
@@ -14,7 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.starter.clj.word-count
-  (:import [org.apache.storm StormSubmitter LocalCluster]
+  (:import [org.apache.storm StormSubmitter]
            [org.apache.storm.utils Utils])
   (:use [org.apache.storm clojure config])
   (:gen-class))
@@ -74,11 +74,6 @@
                    word-count
                    :p 6)}))
 
-(defn run-local! []
-  (with-open [cluster (LocalCluster.)
-              topo (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))]
-    (Thread/sleep 10000)))
-
 (defn submit-topology! [name]
   (StormSubmitter/submitTopology
    name
@@ -88,7 +83,7 @@
 
 (defn -main
   ([]
-   (run-local!))
+   (submit-topology! "test"))
   ([name]
    (submit-topology! name)))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
index 0f5f115..28d01f3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
@@ -18,8 +18,6 @@
 package org.apache.storm.starter;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -28,6 +26,7 @@ import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
 
 /**
  * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a
@@ -36,42 +35,40 @@ import org.apache.storm.tuple.Values;
  * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
  */
 public class BasicDRPCTopology {
-  public static class ExclaimBolt extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String input = tuple.getString(1);
-      collector.emit(new Values(tuple.getValue(0), input + "!"));
-    }
+    public static class ExclaimBolt extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String input = tuple.getString(1);
+            collector.emit(new Values(tuple.getValue(0), input + "!"));
+        }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "result"));
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "result"));
+        }
     }
 
-  }
-
-  public static void main(String[] args) throws Exception {
-    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
-    builder.addBolt(new ExclaimBolt(), 3);
+    public static void main(String[] args) throws Exception {
+        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
+        builder.addBolt(new ExclaimBolt(), 3);
 
-    Config conf = new Config();
+        Config conf = new Config();
+        String topoName = "DRPCExample";
 
-    if (args == null || args.length == 0) {
-      try (LocalDRPC drpc = new LocalDRPC();
-           LocalCluster cluster = new LocalCluster()) {
-
-        cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
-
-        for (String word : new String[]{ "hello", "goodbye" }) {
-          System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
+        if (args != null && args.length > 0) {
+            topoName = args[0]; 
         }
 
-        Thread.sleep(10000);
-      }
-    }
-    else {
-      conf.setNumWorkers(3);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+        
+        if (args.length > 1) {
+            try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+                for (int i = 1; i < args.length; i++) {
+                    String word = args[i];
+                    System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
+                }
+            }
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 0b04709..ed93686 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -71,11 +71,7 @@ public class ExclamationTopology extends ConfigurableTopology {
 
     String topologyName = "test";
 
-    if (isLocal) {
-      ttl = 10;
-    } else {
-      conf.setNumWorkers(3);
-    }
+    conf.setNumWorkers(3);
 
     if (args != null && args.length > 0) {
       topologyName = args[0];

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
index 5acf908..1c48480 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
@@ -117,7 +117,7 @@ public class FastWordCountTopology {
     }
   }
 
-  public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+  public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
     ClusterSummary summary = client.getClusterInfo();
     String id = null;
     for (TopologySummary ts: summary.get_topologies()) {
@@ -157,7 +157,7 @@ public class FastWordCountTopology {
     System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
   } 
 
-  public static void kill(Nimbus.Client client, String name) throws Exception {
+  public static void kill(Nimbus.Iface client, String name) throws Exception {
     KillOptions opts = new KillOptions();
     opts.set_wait_secs(0);
     client.killTopologyWithOpts(name, opts);
@@ -185,7 +185,7 @@ public class FastWordCountTopology {
 
     Map clusterConf = Utils.readStormConfig();
     clusterConf.putAll(Utils.readCommandLineOpts());
-    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+    Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
 
     //Sleep for 5 mins
     for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
index 78907ea..63be726 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
@@ -95,7 +95,7 @@ public class InOrderDeliveryTest {
     }
   }
 
-  public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+  public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
     ClusterSummary summary = client.getClusterInfo();
     String id = null;
     for (TopologySummary ts: summary.get_topologies()) {
@@ -135,7 +135,7 @@ public class InOrderDeliveryTest {
     System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
   } 
 
-  public static void kill(Nimbus.Client client, String name) throws Exception {
+  public static void kill(Nimbus.Iface client, String name) throws Exception {
     KillOptions opts = new KillOptions();
     opts.set_wait_secs(0);
     client.killTopologyWithOpts(name, opts);
@@ -161,7 +161,7 @@ public class InOrderDeliveryTest {
 
     Map clusterConf = Utils.readStormConfig();
     clusterConf.putAll(Utils.readCommandLineOpts());
-    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+    Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
 
     //Sleep for 50 mins
     for (int i = 0; i < 50; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
index c23fad5..b71b64a 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
@@ -17,7 +17,10 @@
  */
 package org.apache.storm.starter;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.bolt.JoinBolt;
 import org.apache.storm.starter.bolt.PrinterBolt;
 import org.apache.storm.testing.FeederSpout;
@@ -25,14 +28,14 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.LocalCluster;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.NimbusClient;
 
 public class JoinBoltExample {
     public static void main(String[] args) throws Exception {
-
+        if (!NimbusClient.isLocalOverride()) {
+            throw new IllegalStateException("This example only works in local mode.  " 
+                    + "Run with storm local not storm jar");
+        }
         FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
         FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
 
@@ -53,16 +56,11 @@ public class JoinBoltExample {
         builder.setBolt("printer", new PrinterBolt() ).shuffleGrouping("joiner");
 
         Config conf = new Config();
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("join-example", conf, builder.createTopology());
+        StormSubmitter.submitTopologyWithProgressBar("join-example", conf, builder.createTopology());
 
         generateGenderData(genderSpout);
 
         generateAgeData(ageSpout);
-
-        Utils.sleep(30000);
-        cluster.shutdown();
     }
 
     private static void generateAgeData(FeederSpout ageSpout) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
index 5375ea6..110d0be 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
@@ -18,9 +18,7 @@
 package org.apache.storm.starter;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.drpc.DRPCSpout;
 import org.apache.storm.drpc.ReturnResults;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -30,6 +28,7 @@ import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
 
 public class ManualDRPC {
     public static class ExclamationBolt extends BaseBasicBolt {
@@ -50,19 +49,17 @@ public class ManualDRPC {
 
     public static void main(String[] args) throws Exception {
         TopologyBuilder builder = new TopologyBuilder();
-        try(LocalDRPC drpc = new LocalDRPC();
-            LocalCluster cluster = new LocalCluster();) {
 
-            DRPCSpout spout = new DRPCSpout("exclamation", drpc);
-            builder.setSpout("drpc", spout);
-            builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
-            builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
+        DRPCSpout spout = new DRPCSpout("exclamation");
+        builder.setSpout("drpc", spout);
+        builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
+        builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
 
-            Config conf = new Config();
-            try (LocalTopology topo = cluster.submitTopology("exclaim", conf, builder.createTopology())) {
-                System.out.println(drpc.execute("exclamation", "aaa"));
-                System.out.println(drpc.execute("exclamation", "bbb"));
-            }
+        Config conf = new Config();
+        StormSubmitter.submitTopology("exclaim", conf, builder.createTopology());
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            System.out.println(drpc.execute("exclamation", "aaa"));
+            System.out.println(drpc.execute("exclamation", "bbb"));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
index aca5c7b..26c4ee8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.starter;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -30,13 +30,9 @@ import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * This is a basic example of a Storm topology.
  */
@@ -90,15 +86,11 @@ public class MultipleLoggerTopology {
 
     Config conf = new Config();
     conf.setDebug(true);
-
+    String topoName = MultipleLoggerTopology.class.getName();
     if (args != null && args.length > 0) {
-      conf.setNumWorkers(2);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    } else {
-      try (LocalCluster cluster = new LocalCluster();
-           LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
-          Utils.sleep(10000);
-      }
+      topoName = args[0];
     }
+    conf.setNumWorkers(2);
+    StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
index 6b4aaa6..e90fcb0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
@@ -17,9 +17,14 @@
  */
 package org.apache.storm.starter;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.coordination.BatchOutputCollector;
 import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
@@ -31,8 +36,7 @@ import org.apache.storm.topology.base.BaseBatchBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-
-import java.util.*;
+import org.apache.storm.utils.DRPCClient;
 
 /**
  * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
@@ -51,144 +55,139 @@ import java.util.*;
  * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
  */
 public class ReachTopology {
-  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
-    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
-    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
-    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
-  }};
-
-  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
-    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
-    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
-    put("tim", Arrays.asList("alex"));
-    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
-    put("adam", Arrays.asList("david", "carissa"));
-    put("mike", Arrays.asList("john", "bob"));
-    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
-  }};
-
-  public static class GetTweeters extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      Object id = tuple.getValue(0);
-      String url = tuple.getString(1);
-      List<String> tweeters = TWEETERS_DB.get(url);
-      if (tweeters != null) {
-        for (String tweeter : tweeters) {
-          collector.emit(new Values(id, tweeter));
+    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+        put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+        put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+        put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+    }};
+
+    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+        put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+        put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+        put("tim", Arrays.asList("alex"));
+        put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+        put("adam", Arrays.asList("david", "carissa"));
+        put("mike", Arrays.asList("john", "bob"));
+        put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+    }};
+
+    public static class GetTweeters extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            Object id = tuple.getValue(0);
+            String url = tuple.getString(1);
+            List<String> tweeters = TWEETERS_DB.get(url);
+            if (tweeters != null) {
+                for (String tweeter : tweeters) {
+                    collector.emit(new Values(id, tweeter));
+                }
+            }
         }
-      }
-    }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "tweeter"));
-    }
-  }
-
-  public static class GetFollowers extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      Object id = tuple.getValue(0);
-      String tweeter = tuple.getString(1);
-      List<String> followers = FOLLOWERS_DB.get(tweeter);
-      if (followers != null) {
-        for (String follower : followers) {
-          collector.emit(new Values(id, follower));
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "tweeter"));
         }
-      }
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "follower"));
-    }
-  }
-
-  public static class PartialUniquer extends BaseBatchBolt {
-    BatchOutputCollector _collector;
-    Object _id;
-    Set<String> _followers = new HashSet<String>();
+    public static class GetFollowers extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            Object id = tuple.getValue(0);
+            String tweeter = tuple.getString(1);
+            List<String> followers = FOLLOWERS_DB.get(tweeter);
+            if (followers != null) {
+                for (String follower : followers) {
+                    collector.emit(new Values(id, follower));
+                }
+            }
+        }
 
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-      _collector = collector;
-      _id = id;
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "follower"));
+        }
     }
 
-    @Override
-    public void execute(Tuple tuple) {
-      _followers.add(tuple.getString(1));
-    }
+    public static class PartialUniquer extends BaseBatchBolt {
+        BatchOutputCollector _collector;
+        Object _id;
+        Set<String> _followers = new HashSet<String>();
 
-    @Override
-    public void finishBatch() {
-      _collector.emit(new Values(_id, _followers.size()));
-    }
+        @Override
+        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+            _collector = collector;
+            _id = id;
+        }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "partial-count"));
-    }
-  }
+        @Override
+        public void execute(Tuple tuple) {
+            _followers.add(tuple.getString(1));
+        }
 
-  public static class CountAggregator extends BaseBatchBolt {
-    BatchOutputCollector _collector;
-    Object _id;
-    int _count = 0;
+        @Override
+        public void finishBatch() {
+            _collector.emit(new Values(_id, _followers.size()));
+        }
 
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-      _collector = collector;
-      _id = id;
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "partial-count"));
+        }
     }
 
-    @Override
-    public void execute(Tuple tuple) {
-      _count += tuple.getInteger(1);
-    }
+    public static class CountAggregator extends BaseBatchBolt {
+        BatchOutputCollector _collector;
+        Object _id;
+        int _count = 0;
 
-    @Override
-    public void finishBatch() {
-      _collector.emit(new Values(_id, _count));
-    }
+        @Override
+        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+            _collector = collector;
+            _id = id;
+        }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "reach"));
-    }
-  }
+        @Override
+        public void execute(Tuple tuple) {
+            _count += tuple.getInteger(1);
+        }
 
-  public static LinearDRPCTopologyBuilder construct() {
-    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
-    builder.addBolt(new GetTweeters(), 4);
-    builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
-    builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
-    builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
-    return builder;
-  }
+        @Override
+        public void finishBatch() {
+            _collector.emit(new Values(_id, _count));
+        }
 
-  public static void main(String[] args) throws Exception {
-    LinearDRPCTopologyBuilder builder = construct();
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "reach"));
+        }
+    }
 
+    public static LinearDRPCTopologyBuilder construct() {
+        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
+        builder.addBolt(new GetTweeters(), 4);
+        builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+        builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
+        builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
+        return builder;
+    }
 
-    Config conf = new Config();
+    public static void main(String[] args) throws Exception {
+        LinearDRPCTopologyBuilder builder = construct();
 
-    if (args == null || args.length == 0) {
-      conf.setMaxTaskParallelism(3);
-      try (LocalDRPC drpc = new LocalDRPC();
-          LocalCluster cluster = new LocalCluster();) {
-        cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
+        Config conf = new Config();
+        conf.setNumWorkers(6);
+        String topoName = "reach-drpc";
+        if (args.length > 0) {
+            topoName = args[0];
+        }
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createRemoteTopology());
 
-        String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
-        for (String url : urlsToTry) {
-          System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
+            for (String url : urlsToTry) {
+                System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+            }
         }
-      }
-    }
-    else {
-      conf.setNumWorkers(6);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
index 788485d..57fcd5d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.starter;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -32,9 +32,6 @@ import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
 
 public class ResourceAwareExampleTopology {
   public static class ExclamationBolt extends BaseRichBolt {
@@ -90,17 +87,12 @@ public class ResourceAwareExampleTopology {
     // Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
     conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
 
+    String topoName = "test";
     if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        topoName = args[0];
     }
-    else {
+    conf.setNumWorkers(3);
 
-      try (LocalCluster cluster = new LocalCluster();
-           LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
-        Utils.sleep(10000);
-      }
-    }
+    StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
index 7ebbaf8..78b2baf 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -57,15 +57,6 @@ public class RollingTopWords extends ConfigurableTopology {
    *
    * ```
    *
-   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords -local
-   * 
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar -local
-   * 
-   * # Runs in local mode (LocalCluster) for 30 seconds, with topology name "foobar" 
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar -local -ttl 30
-   *
    * # Runs in remote/cluster mode, with topology name "production-topology"
    * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology ```
    *

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
index 4eb9ed9..2e1bb94 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
@@ -18,52 +18,50 @@
 package org.apache.storm.starter;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.bolt.SingleJoinBolt;
 import org.apache.storm.testing.FeederSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.starter.bolt.SingleJoinBolt;
+import org.apache.storm.utils.NimbusClient;
 
 /** Example of using a simple custom join bolt
  *  NOTE: Prefer to use the built-in JoinBolt wherever applicable
  */
 
 public class SingleJoinExample {
-  public static void main(String[] args) throws Exception {
-    FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
-    FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+    public static void main(String[] args) throws Exception {
+        if (!NimbusClient.isLocalOverride()) {
+            throw new IllegalStateException("This example only works in local mode.  "
+                    + "Run with storm local not storm jar");
+        }
+        FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+        FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
 
-    TopologyBuilder builder = new TopologyBuilder();
-    builder.setSpout("gender", genderSpout);
-    builder.setSpout("age", ageSpout);
-    builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("gender", genderSpout);
+        builder.setSpout("age", ageSpout);
+        builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
         .fieldsGrouping("age", new Fields("id"));
 
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    try (LocalCluster cluster = new LocalCluster();
-         LocalTopology topo = cluster.submitTopology("join-example", conf, builder.createTopology());) {
+        Config conf = new Config();
+        conf.setDebug(true);
+        StormSubmitter.submitTopology("join-example", conf, builder.createTopology());
 
-      for (int i = 0; i < 10; i++) {
-        String gender;
-        if (i % 2 == 0) {
-          gender = "male";
+        for (int i = 0; i < 10; i++) {
+            String gender;
+            if (i % 2 == 0) {
+                gender = "male";
+            }
+            else {
+                gender = "female";
+            }
+            genderSpout.feed(new Values(i, gender));
         }
-        else {
-          gender = "female";
-        }
-        genderSpout.feed(new Values(i, gender));
-      }
-
-      for (int i = 9; i >= 0; i--) {
-        ageSpout.feed(new Values(i, i + 20));
-      }
 
-      Utils.sleep(2000);
+        for (int i = 9; i >= 0; i--) {
+            ageSpout.feed(new Values(i, i + 20));
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
index 83ad4fc..4e18217 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -60,15 +60,6 @@ public class SkewedRollingTopWords extends ConfigurableTopology {
    *
    * ```
    *
-   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords -local
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords foobar -local
-   * 
-   * # Runs in local mode (LocalCluster) for 30 seconds, with topology name "foobar" 
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords foobar -local -ttl 30
-   *
    * # Runs in remote/cluster mode, with topology name "production-topology"
    * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords production-topology ```
    *

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
index 3993b95..6204f8c 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
@@ -17,20 +17,16 @@
  */
 package org.apache.storm.starter;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.starter.bolt.PrinterBolt;
 import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
 
 /**
  * Windowing based on tuple timestamp (e.g. the time when tuple is generated
@@ -48,15 +44,13 @@ public class SlidingTupleTsTopology {
         builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
         Config conf = new Config();
         conf.setDebug(true);
-
+        String topoName = "test";
+        
         if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
-                Utils.sleep(40000);
-            }
+            topoName = args[0];
         }
+        
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
index 3c68f16..f00dac9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
@@ -17,30 +17,26 @@
  */
 package org.apache.storm.starter;
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.windowing.TupleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.storm.starter.bolt.PrinterBolt;
-import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
-import org.apache.storm.starter.spout.RandomIntegerSpout;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
 
 /**
  * A sample topology that demonstrates the usage of {@link org.apache.storm.topology.IWindowedBolt}
@@ -95,14 +91,11 @@ public class SlidingWindowTopology {
         builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
         Config conf = new Config();
         conf.setDebug(true);
+        String topoName = "test";
         if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
-                Utils.sleep(40000);
-            }
+            topoName = args[0];
         }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
index cd8fa2c..36495fd 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.starter;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
 import org.apache.storm.state.KeyValueState;
@@ -33,12 +33,9 @@ import org.apache.storm.topology.base.BaseStatefulBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * An example topology that demonstrates the use of {@link org.apache.storm.topology.IStatefulBolt}
  * to manage state. To run the example,
@@ -129,15 +126,11 @@ public class StatefulTopology {
         builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
         Config conf = new Config();
         conf.setDebug(false);
-
+        String topoName = "test";
         if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topology = cluster.submitTopology("test", conf, builder.createTopology());) {
-                Utils.sleep(40000);
-            }
+            topoName = args[0];
         }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
index 0e9b6ea..8cce057 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.starter;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.bolt.PrinterBolt;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
@@ -30,18 +30,14 @@ import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.windowing.TupleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
-
 /**
  * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to
  * save the state of the windowing operation to avoid re-computation in case of failures.
@@ -97,15 +93,13 @@ public class StatefulWindowingTopology {
         Config conf = new Config();
         conf.setDebug(false);
         //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+        
+        String topoName = "test";
         if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
-                Utils.sleep(40000);
-            }
+            topoName = args[0];
         }
+        conf.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
     }
 
 }