You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:13 UTC

[05/10] storm git commit: STORM-2447: add in storm local to avoid having server on worker classpath

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-jms-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml
index 1e10fde..591cf7d 100644
--- a/examples/storm-jms-examples/pom.xml
+++ b/examples/storm-jms-examples/pom.xml
@@ -60,7 +60,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <!-- keep storm out of the jar-with-dependencies -->
             <scope>${provided.scope}</scope>
@@ -112,39 +112,6 @@
                 </executions>
 
             </plugin>
-
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>exec-maven-plugin</artifactId>
-                <version>1.2.1</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>exec</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <executable>java</executable>
-                    <includeProjectDependencies>true</includeProjectDependencies>
-                    <includePluginDependencies>true</includePluginDependencies>
-                    <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass>
-                    <systemProperties>
-                        <systemProperty>
-                            <key>log4j.configuration</key>
-                            <value>file:./src/main/resources/log4j.properties</value>
-                        </systemProperty>
-                    </systemProperties>
-                </configuration>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.apache.storm</groupId>
-                        <artifactId>storm-server</artifactId>
-                        <version>${project.version}</version>
-                        <type>jar</type>
-                    </dependency>
-                </dependencies>
-            </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
index 82dbd5b..accb052 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -23,8 +23,6 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.jms.JmsMessageProducer;
 import org.apache.storm.jms.JmsProvider;
@@ -34,7 +32,6 @@ import org.apache.storm.jms.spout.JmsSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.ITuple;
-import org.apache.storm.utils.Utils;
 
 public class ExampleJmsTopology {
     public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
@@ -111,21 +108,13 @@ public class ExampleJmsTopology {
                 JMS_TOPIC_SPOUT);
 
         Config conf = new Config();
-
+        String topoName = "storm-jms-example";
         if (args.length > 0) {
-            conf.setNumWorkers(3);
-
-            StormSubmitter.submitTopology(args[0], conf,
-                    builder.createTopology());
-        } else {
-
-            conf.setDebug(true);
-
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("storm-jms-example", conf, builder.createTopology());) {
-                Utils.sleep(60000);
-            }
+            topoName = args[0];
         }
-    }
+        conf.setNumWorkers(3);
 
+        StormSubmitter.submitTopology(topoName, conf,
+                builder.createTopology());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml
index e319346..d0877c8 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -34,7 +34,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>
@@ -134,4 +134,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index edd1f09..9ab00f5 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -18,6 +18,13 @@
 
 package org.apache.storm.kafka.trident;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
@@ -33,13 +40,6 @@ import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
 public class TridentKafkaClientWordCountNamedTopics {
     private static final String TOPIC_1 = "test-trident";
     private static final String TOPIC_2 = "test-trident-1";
@@ -93,49 +93,19 @@ public class TridentKafkaClientWordCountNamedTopics {
 
             System.out.printf("Running with broker_url: [%s], topics: [%s, %s]\n", brokerUrl, topic1, topic2);
 
-            Config tpConf = LocalSubmitter.defaultConfig(true);
-
-            if (args.length == 4) { //Submit Remote
-                // Producers
-                StormSubmitter.submitTopology(topic1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
-                StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
-                // Consumer
-                StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
-
-                // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
-                Thread.sleep(2000);
-                DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
-
-            } else { //Submit Local
-
-                final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
-                final String topic1Tp = "topic1-producer";
-                final String topic2Tp = "topic2-producer";
-                final String consTpName = "topics-consumer";
-
-                try {
-                    // Producers
-                    localSubmitter.submit(topic1Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
-                    localSubmitter.submit(topic2Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
-                    // Consumer
-                    try {
-                        localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
-                                localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
-                        // print
-                        localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-
-                } finally {
-                    // kill
-                    localSubmitter.kill(topic1Tp);
-                    localSubmitter.kill(topic2Tp);
-                    localSubmitter.kill(consTpName);
-                    // shutdown
-                    localSubmitter.shutdown();
-                }
-            }
+            Config tpConf = new Config();
+            tpConf.setDebug(true);
+            tpConf.setMaxSpoutPending(5);
+
+            // Producers
+            StormSubmitter.submitTopology(topic1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
+            StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
+            // Consumer
+            StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
+
+            // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+            Thread.sleep(2000);
+            DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
         }
         System.exit(0);     // Kill all the non daemon threads
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
index 8e89aec..f2d927d 100644
--- a/examples/storm-kafka-examples/pom.xml
+++ b/examples/storm-kafka-examples/pom.xml
@@ -35,7 +35,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
index b08fc96..3476d9f 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
@@ -18,17 +18,16 @@
 
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.LocalDRPC;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.storm.generated.DistributedRPC;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 public class DrpcResultsPrinter {
     private static final Logger LOG = LoggerFactory.getLogger(DrpcResultsPrinter.class);
 
@@ -39,13 +38,6 @@ public class DrpcResultsPrinter {
     }
 
     /**
-     * @return local DRPC client running on the same JVML
-     */
-    public static DrpcResultsPrinter localClient() {
-        return new DrpcResultsPrinter(new LocalDRPC());
-    }
-
-    /**
      * @return remote DRPC client running on local host, on port 3772, with defaults.yaml config
      */
     public static DrpcResultsPrinter remoteClient() {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
deleted file mode 100644
index 9666695..0000000
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *  
- *   http://www.apache.org/licenses/LICENSE-2.0
- *  
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalDRPC;
-import org.apache.storm.generated.StormTopology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-public class LocalSubmitter {
-    protected static final Logger LOG = LoggerFactory.getLogger(LocalSubmitter.class);
-
-    private LocalDRPC drpc;
-    private LocalCluster cluster;
-
-    public LocalSubmitter(LocalDRPC drpc, LocalCluster cluster) {
-        this.drpc = drpc;
-        this.cluster = cluster;
-    }
-
-    public static LocalSubmitter newInstance() {
-        try {
-            return new LocalSubmitter(new LocalDRPC(), new LocalCluster());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static Config defaultConfig() {
-        return defaultConfig(false);
-    }
-
-    public static Config defaultConfig(boolean debug) {
-        final Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        conf.setDebug(debug);
-        return conf;
-    }
-
-    public LocalSubmitter(StormTopology topology, LocalDRPC drpc, LocalCluster cluster, String name) {
-        this(drpc, cluster);
-    }
-
-    public void submit(String name, Config config, StormTopology topology) {
-        try {
-            cluster.submitTopology(name, config, topology);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Prints the DRPC results for the amount of time specified
-     */
-    public void printResults(int num, int time, TimeUnit unit) {
-        for (int i = 0; i < num; i++) {
-            try {
-                LOG.info("--- DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped"));
-                System.out.println();
-                Thread.sleep(unit.toMillis(time));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public void kill (String name) {
-        try {
-            cluster.killTopology(name);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void shutdown() {
-        cluster.shutdown();
-    }
-
-    public LocalDRPC getDrpc() {
-        return drpc;
-    }
-
-    public LocalCluster getCluster() {
-        return cluster;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index a39eba1..e571210 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.LocalDRPC;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.starter.trident.DebugMemoryMapState;
 import org.apache.storm.trident.Stream;
@@ -30,7 +29,6 @@ import org.apache.storm.trident.operation.builtin.Debug;
 import org.apache.storm.trident.operation.builtin.FilterNull;
 import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.testing.Split;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
@@ -44,22 +42,13 @@ public class TridentKafkaConsumerTopology {
      * See {@link TridentKafkaConsumerTopology#newTopology(LocalDRPC, ITridentDataSource)}
      */
     public static StormTopology newTopology(ITridentDataSource tridentSpout) {
-        return newTopology(null, tridentSpout);
-    }
-
-    /**
-     * @param drpc The DRPC stream to be used in querying the word counts. Can be null in distributed mode
-     * @return a trident topology that consumes sentences from the kafka topic specified using a
-     * {@link TransactionalTridentKafkaSpout} computes the word count and stores it in a {@link MemoryMapState}.
-     */
-    public static StormTopology newTopology(LocalDRPC drpc, ITridentDataSource tridentSpout) {
         final TridentTopology tridentTopology = new TridentTopology();
-        addDRPCStream(tridentTopology, addTridentState(tridentTopology, tridentSpout), drpc);
+        addDRPCStream(tridentTopology, addTridentState(tridentTopology, tridentSpout));
         return tridentTopology.build();
     }
 
-    private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state, LocalDRPC drpc) {
-        return tridentTopology.newDRPCStream("words", drpc)
+    private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state) {
+        return tridentTopology.newDRPCStream("words")
                 .each(new Fields("args"), new Split(), new Fields("word"))
                 .groupBy(new Fields("word"))
                 .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count"))

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
index 11ea899..ad785b8 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
@@ -18,9 +18,10 @@
 
 package org.apache.storm.kafka.trident;
 
+import java.util.Properties;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
 import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
@@ -30,8 +31,6 @@ import org.apache.storm.trident.testing.FixedBatchSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.util.Properties;
-
 public class TridentKafkaTopology {
 
     private static StormTopology buildTopology(String brokerConnectionString) {
@@ -79,10 +78,6 @@ public class TridentKafkaTopology {
         if(args.length < 1) {
             System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
         }
-
-        try (LocalCluster cluster = new LocalCluster();
-             LocalTopology topo = cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));) {
-            Thread.sleep(60 * 1000);
-        }
+        StormSubmitter.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
index 84dc380..6aa54ea 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
@@ -77,40 +77,24 @@ public class TridentKafkaWordCount implements Serializable {
     public static void main(String[] args) throws Exception {
         final String[] zkBrokerUrl = parseUrl(args);
         final String topicName = "test";
-        Config tpConf = LocalSubmitter.defaultConfig();
+        Config tpConf = new Config();
+        tpConf.setMaxSpoutPending(20);
+        String prodTpName = "kafkaBolt";
+        String consTpName = "wordCounter";
 
-        if (args.length == 3)  { //Submit Remote
-            // Producer
-            StormSubmitter.submitTopology(args[2] + "-producer", tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
-            // Consumer
-            StormSubmitter.submitTopology(args[2] + "-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(
-                    new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
-
-            // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
-            Thread.sleep(2000);
-            DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
-        } else { //Submit Local
-            final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
-            final String prodTpName = "kafkaBolt";
-            final String consTpName = "wordCounter";
-
-            try {
-                // Producer
-                localSubmitter.submit(prodTpName, tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
-                // Consumer
-                localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(localSubmitter.getDrpc(),
-                        new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
-
-                // print
-                new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
-            } finally {
-                // kill
-                localSubmitter.kill(prodTpName);
-                localSubmitter.kill(consTpName);
-                // shutdown
-                localSubmitter.shutdown();
-            }
+        if (args.length == 3)  {
+            prodTpName = args[2] + "-producer";
+            consTpName = args[2] + "-consumer";
         }
+        // Producer
+        StormSubmitter.submitTopology(prodTpName, tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
+        // Consumer
+        StormSubmitter.submitTopology(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
+                new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
+
+        // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+        Thread.sleep(2000);
+        DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
     }
 
     private static String[] parseUrl(String[] args) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml
index e4a2a92..16200fc 100644
--- a/examples/storm-mongodb-examples/pom.xml
+++ b/examples/storm-mongodb-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
index 6f71b1c..3b27fd1 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
@@ -18,17 +18,12 @@
 package org.apache.storm.mongodb.topology;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.mongodb.bolt.MongoInsertBolt;
 import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 
 public class InsertWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -65,17 +60,13 @@ public class InsertWordCount {
         builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
 
-
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
+        String topoName = "test";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
index 5140685..c9d43bd 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.storm.mongodb.topology;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.mongodb.bolt.MongoLookupBolt;
 import org.apache.storm.mongodb.common.QueryFilterCreator;
@@ -64,17 +63,14 @@ public class LookupWordCount {
         builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
 
-        if (args.length == 2) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.createTopology());
-            Thread.sleep(30000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
+        String topoName = "test";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]");
+            return;
         }
+        
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
index b4af4ca..8e4f757 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -18,16 +18,14 @@
 package org.apache.storm.mongodb.topology;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
 import org.apache.storm.mongodb.common.QueryFilterCreator;
 import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 
 public class UpdateWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -73,17 +71,13 @@ public class UpdateWordCount {
         builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
 
-
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
+        String topoName = "test";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
index 14dccbd..ef5a39a 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -18,8 +18,6 @@
 package org.apache.storm.mongodb.trident;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.mongodb.common.mapper.MongoMapper;
@@ -74,19 +72,15 @@ public class WordCountTrident {
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));) {
-                Thread.sleep(60 * 1000);
-            }
-            System.exit(0);
-        }
-        else if(args.length == 3) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
-        } else{
+        String topoName = "wordCounter";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3 || args.length < 2) {
             System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+            return;
         }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], args[1]));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
index 83c0caf..0ff2c51 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
@@ -18,14 +18,13 @@
 package org.apache.storm.mongodb.trident;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.mongodb.common.QueryFilterCreator;
 import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
 import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.mongodb.trident.state.*;
+import org.apache.storm.mongodb.trident.state.MongoMapState;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
@@ -76,20 +75,15 @@ public class WordCountTridentMap {
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (args.length == 2) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
-            Thread.sleep(60 * 1000);
-            cluster.killTopology("wordCounter");
-            cluster.shutdown();
-            System.exit(0);
-        }
-        else if(args.length == 3) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
-        } else{
+        String topoName = "wordCounter";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3 || args.length < 2) {
             System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+            return;
         }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], args[1]));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mqtt-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml
index 41ff820..a178c5b 100644
--- a/examples/storm-mqtt-examples/pom.xml
+++ b/examples/storm-mqtt-examples/pom.xml
@@ -34,7 +34,7 @@
   <dependencies>
    <dependency>
       <groupId>org.apache.storm</groupId>
-      <artifactId>storm-server</artifactId>
+      <artifactId>storm-client</artifactId>
       <version>${project.version}</version>
       <scope>${provided.scope}</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-opentsdb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml
index 9f550e6..4ec799b 100644
--- a/examples/storm-opentsdb-examples/pom.xml
+++ b/examples/storm-opentsdb-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
index 7dc65fd..009366f 100644
--- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
@@ -18,17 +18,15 @@
  */
 package org.apache.storm.opentsdb;
 
+import java.util.Collections;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
 import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
 import org.apache.storm.opentsdb.client.OpenTsdbClient;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
 import org.apache.storm.topology.TopologyBuilder;
 
-import java.util.Collections;
-
 /**
  * Sample application to use OpenTSDB bolt.
  */
@@ -51,19 +49,12 @@ public class SampleOpenTsdbBoltTopology {
 
         Config conf = new Config();
         conf.setDebug(true);
-
+        String topoName = "word-count";
         if (args.length > 1) {
-            conf.setNumWorkers(3);
-
-            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
-        } else {
-            conf.setMaxTaskParallelism(3);
-
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("word-count", conf, topologyBuilder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
+            topoName = args[1];
         }
+        conf.setNumWorkers(3);
+
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topologyBuilder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
index 3220068..8ac950b 100644
--- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
@@ -18,14 +18,14 @@
  */
 package org.apache.storm.opentsdb;
 
+import java.util.Collections;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
 import org.apache.storm.opentsdb.client.OpenTsdbClient;
 import org.apache.storm.opentsdb.trident.OpenTsdbStateFactory;
 import org.apache.storm.opentsdb.trident.OpenTsdbStateUpdater;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.Consumer;
@@ -33,8 +33,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-
 /**
  * Sample trident topology to store time series metrics in to OpenTsdb.
  */
@@ -67,20 +65,12 @@ public class SampleOpenTsdbTridentTopology {
 
         Config conf = new Config();
         conf.setDebug(true);
-
+        String topoName = "word-count";
         if (args.length > 1) {
-            conf.setNumWorkers(3);
-
-            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
-        } else {
-            conf.setMaxTaskParallelism(3);
-
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("word-count", conf, tridentTopology.build())) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
+            topoName = args[1];
         }
+        conf.setNumWorkers(3);
 
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, tridentTopology.build());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 260eb06..1625090 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -83,7 +83,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <!--
               Use "provided" scope to keep storm out of the jar-with-dependencies

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
index 11c63d3..69df3fb 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -19,7 +19,6 @@
 package org.apache.storm.perf;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.perf.bolt.DevNullBolt;
 import org.apache.storm.perf.bolt.IdBolt;
@@ -49,7 +48,7 @@ public class ConstSpoutIdBoltNullBoltTopo {
     public static final String BOLT2_COUNT = "bolt2.count";
     public static final String SPOUT_COUNT = "spout.count";
 
-    public static StormTopology getTopology(Map conf) {
+    public static StormTopology getTopology(Map<String, Object> conf) {
 
         // 1 -  Setup Spout   --------
         ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -75,27 +74,19 @@ public class ConstSpoutIdBoltNullBoltTopo {
 
 
     public static void main(String[] args) throws Exception {
-
-        if (args.length <= 0) {
-            // submit to local cluster
-            Config conf = new Config();
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            // submit to real cluster
-            if (args.length >2) {
-                System.err.println("args: runDurationSec  [optionalConfFile]");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf =  (args.length==2) ? Utils.findAndReadConfigFile(args[1])  : new Config();
-
-            //  Submit topology to storm cluster
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+        int runTime = -1;
+        Config topoConf = new Config();
+        if (args.length > 0) {
+            runTime = Integer.parseInt(args[0]);
+        }
+        if (args.length > 1) {
+            topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        }
+        if (args.length > 2) {
+            System.err.println("args: [runDurationSec]  [optionalConfFile]");
+            return;
         }
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
index 92c2787..298c73e 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -18,8 +18,9 @@
 
 package org.apache.storm.perf;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.perf.bolt.DevNullBolt;
 import org.apache.storm.perf.spout.ConstSpout;
@@ -28,8 +29,6 @@ import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
-import java.util.Map;
-
 /***
  * This topo helps measure the messaging speed between a spout and a bolt.
  *  Spout generates a stream of a fixed string.
@@ -51,7 +50,7 @@ public class ConstSpoutNullBoltTopo {
     public static final String SHUFFLE_GROUPING = "shuffle";
     public static final String DEFAULT_GROUPING = LOCAL_GROPING;
 
-    public static StormTopology getTopology(Map conf) {
+    public static StormTopology getTopology(Map<String, Object> conf) {
 
         // 1 -  Setup Spout   --------
         ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -78,29 +77,20 @@ public class ConstSpoutNullBoltTopo {
      * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
      */
     public static void main(String[] args) throws Exception {
-
-        if(args.length <= 0) {
-            // For IDE based profiling ... submit topology to local cluster
-            Config conf = new Config();
-            final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-
-        } else {
-            // For measuring perf against a Storm cluster
-            if (args.length > 2) {
-                System.err.println("args: runDurationSec  [optionalConfFile]");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf =  (args.length==2) ? Utils.findAndReadConfigFile(args[1])  : new Config();
-
-            //  Submit topology to storm cluster
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+        int runTime = -1;
+        Config topoConf = new Config();
+        if (args.length > 0) {
+            runTime = Integer.parseInt(args[0]);
+        }
+        if (args.length > 1) {
+            topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        }
+        if (args.length > 2) {
+            System.err.println("args: [runDurationSec]  [optionalConfFile]");
+            return;
         }
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
index 721ae3d..94bd17f 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -19,11 +19,11 @@
 package org.apache.storm.perf;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.perf.spout.ConstSpout;
 import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
 
 
 /***
@@ -52,23 +52,19 @@ public class ConstSpoutOnlyTopo {
      * ConstSpout only topology  (No bolts)
      */
     public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // For IDE based profiling ... submit topology to local cluster
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology());
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit topology to storm cluster
-            if (args.length != 1) {
-                System.err.println("args: runDurationSec");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology());
+        int runTime = -1;
+        Config topoConf = new Config();
+        if (args.length > 0) {
+            runTime = Integer.parseInt(args[0]);
+        }
+        if (args.length > 1) {
+            topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        }
+        if (args.length > 2) {
+            System.err.println("args: [runDurationSec]  [optionalConfFile]");
+            return;
         }
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
index d518c86..e64dd36 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -18,8 +18,9 @@
 package org.apache.storm.perf;
 
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.perf.bolt.CountBolt;
 import org.apache.storm.perf.bolt.SplitSentenceBolt;
@@ -29,9 +30,6 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
-
-import java.util.Map;
-
 /***
  * This topo helps measure speed of word count.
  *  Spout loads a file into memory on initialization, then emits the lines in an endless loop.
@@ -54,7 +52,7 @@ public class FileReadWordCountTopo {
     public static final int DEFAULT_COUNT_BOLT_NUM = 2;
 
 
-    public static StormTopology getTopology(Map config) {
+    public static StormTopology getTopology(Map<String, Object> config) {
 
         final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
         final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
@@ -70,27 +68,19 @@ public class FileReadWordCountTopo {
     }
 
     public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // For IDE based profiling ... submit topology to local cluster
-            Config conf = new Config();
-            conf.put(INPUT_FILE, "resources/randomwords.txt");
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit to Storm cluster
-            if (args.length !=2) {
-                System.err.println("args: runDurationSec  confFile");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-
+        int runTime = -1;
+        Config topoConf = new Config();
+        if (args.length > 0) {
+            runTime = Integer.parseInt(args[0]);
+        }
+        if (args.length > 1) {
+            topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        }
+        if (args.length > 2) {
+            System.err.println("args: [runDurationSec]  [optionalConfFile]");
+            return;
         }
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index 5b97540..ed81a4a 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -19,7 +19,8 @@
 
 package org.apache.storm.perf;
 
-import org.apache.storm.LocalCluster;
+import java.util.Map;
+
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hdfs.bolt.HdfsBolt;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
@@ -35,8 +36,6 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Utils;
 
-import java.util.Map;
-
 /***
  * This topo helps measure speed of writing to Hdfs
  *  Spout generates fixed length random strings.
@@ -64,7 +63,7 @@ public class StrGenSpoutHdfsBoltTopo {
     public static final String BOLT_ID = "hdfsBolt";
 
 
-    public static StormTopology getTopology(Map topoConf) {
+    public static StormTopology getTopology(Map<String, Object> topoConf) {
         final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
 
         // 1 -  Setup StringGen Spout   --------
@@ -104,30 +103,30 @@ public class StrGenSpoutHdfsBoltTopo {
 
     /** Spout generates random strings and HDFS bolt writes them to a text file */
     public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // submit to local cluster
-            Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit to Storm cluster
-            if (args.length !=2) {
-                System.err.println("args: runDurationSec confFile");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+        String confFile = "conf/HdfsSpoutTopo.yaml";
+        int runTime = -1; //Run until Ctrl-C
+        if (args.length > 0) {
+            runTime = Integer.parseInt(args[0]);
+        }
+
+        if (args.length > 1) {
+            confFile = args[1];
+        }
+
+        //  Submit to Storm cluster
+        if (args.length > 2) {
+            System.err.println("args: [runDurationSec] [confFile]");
+            return;
         }
+
+        Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
+
+        Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
     }
 
 
     public static class LineWriter implements RecordFormat {
+        private static final long serialVersionUID = 7524288317405514146L;
         private String lineDelimiter = System.lineSeparator();
         private String fieldName;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
index e01de0e..a008888 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.perf.utils;
 
-import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.utils.Utils;
 import org.apache.log4j.Logger;
@@ -27,10 +26,7 @@ import java.io.PrintWriter;
 import java.util.*;
 
 
-public class BasicMetricsCollector  {
-
-    private LocalCluster localCluster = null;
-    private Nimbus.Client client = null;
+public class BasicMetricsCollector implements AutoCloseable {
     private PrintWriter dataWriter;
     private long startTime=0;
 
@@ -80,20 +76,8 @@ public class BasicMetricsCollector  {
     private double maxLatency = 0;
 
     boolean first = true;
-
-    public BasicMetricsCollector(Nimbus.Client client, String topoName, Map stormConfig) {
-        this(topoName, stormConfig);
-        this.client = client;
-        this.localCluster = null;
-    }
-
-    public BasicMetricsCollector(LocalCluster localCluster, String topoName, Map stormConfig) {
-        this(topoName, stormConfig);
-        this.client = null;
-        this.localCluster = localCluster;
-    }
-
-    private BasicMetricsCollector(String topoName, Map stormConfig) {
+    
+    public BasicMetricsCollector(String topoName, Map<String, Object> stormConfig) {
         Set<MetricsItem> items = getMetricsToCollect();
         this.config = new MetricsCollectorConfig(topoName, stormConfig);
         collectTopologyStats = collectTopologyStats(items);
@@ -104,14 +88,13 @@ public class BasicMetricsCollector  {
         dataWriter = new PrintWriter(System.err);
     }
 
-
     private Set<MetricsItem>  getMetricsToCollect() {
         Set<MetricsItem> result = new HashSet<>();
         result.add(MetricsItem.ALL);
         return result;
     }
 
-    public void collect(Nimbus.Client client) {
+    public void collect(Nimbus.Iface client) {
         try {
             if (!first) {
                 this.lastSample = this.curSample;
@@ -130,25 +113,7 @@ public class BasicMetricsCollector  {
         }
     }
 
-    public void collect(LocalCluster localCluster) {
-        try {
-            if (!first) {
-                this.lastSample = this.curSample;
-                this.curSample = MetricsSample.factory(localCluster, config.name);
-                updateStats(dataWriter);
-                writeLine(dataWriter);
-            } else {
-                LOG.info("Getting baseline metrics sample.");
-                writeHeader(dataWriter);
-                this.curSample = MetricsSample.factory(localCluster, config.name);
-                first = false;
-                startTime = System.currentTimeMillis();
-            }
-        } catch (Exception e) {
-            LOG.error("storm metrics failed! ", e);
-        }
-    }
-
+    @Override
     public void close() {
         dataWriter.close();
     }
@@ -287,13 +252,13 @@ public class BasicMetricsCollector  {
         private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class);
 
         // storm configuration
-        public final Map stormConfig;
+        public final Map<String, Object> stormConfig;
         // storm topology name
         public final String name;
         // benchmark label
         public final String label;
 
-        public MetricsCollectorConfig(String topoName, Map stormConfig) {
+        public MetricsCollectorConfig(String topoName, Map<String, Object> stormConfig) {
             this.stormConfig = stormConfig;
             String labelStr = (String) stormConfig.get("benchmark.label");
             this.name = topoName;

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index 465ff33..d26078a 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -18,39 +18,23 @@
 
 package org.apache.storm.perf.utils;
 
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
+import java.util.Map;
+
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.KillOptions;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.NimbusClient;
-
-import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 
 
 public class Helper {
 
-  public static void kill(Nimbus.Client client, String topoName) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(topoName, opts);
-  }
-
-  public static void killAndShutdownCluster(LocalCluster cluster, String topoName) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    cluster.killTopologyWithOpts(topoName, opts);
-    cluster.shutdown();
-  }
-
-
-    public static LocalCluster runOnLocalCluster(String topoName, StormTopology topology) throws Exception {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(topoName, new Config(), topology);
-        return cluster;
+    public static void kill(Nimbus.Iface client, String topoName) throws Exception {
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(topoName, opts);
     }
 
     public static int getInt(Map map, Object key, int def) {
@@ -61,72 +45,52 @@ public class Helper {
         return (String) map.get(key);
     }
 
-    public static void collectMetricsAndKill(String topologyName, Integer pollInterval, Integer duration) throws Exception {
-        Map clusterConf = Utils.readStormConfig();
-        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-        BasicMetricsCollector metricsCollector = new BasicMetricsCollector(client, topologyName, clusterConf);
-
-        int times = duration / pollInterval;
-        metricsCollector.collect(client);
-        for (int i = 0; i < times; i++) {
-            Thread.sleep(pollInterval * 1000);
-            metricsCollector.collect(client);
+    public static void collectMetricsAndKill(String topologyName, Integer pollInterval, int duration) throws Exception {
+        Map<String, Object> clusterConf = Utils.readStormConfig();
+        Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+        try (BasicMetricsCollector metricsCollector = new BasicMetricsCollector(topologyName, clusterConf)) {
+
+            if (duration > 0) {
+                int times = duration / pollInterval;
+                metricsCollector.collect(client);
+                for (int i = 0; i < times; i++) {
+                    Thread.sleep(pollInterval * 1000);
+                    metricsCollector.collect(client);
+                }
+            } else {
+                while (true) { //until Ctrl-C
+                    metricsCollector.collect(client);
+                    Thread.sleep(pollInterval * 1000);
+                }
+            }
+        } finally {
+            kill(client, topologyName);
         }
-        metricsCollector.close();
-        kill(client, topologyName);
     }
 
-    public static void collectLocalMetricsAndKill(LocalCluster localCluster, String topologyName, Integer pollInterval, Integer duration, Map clusterConf) throws Exception {
-        BasicMetricsCollector metricsCollector = new BasicMetricsCollector(localCluster, topologyName, clusterConf);
-
-        int times = duration / pollInterval;
-        metricsCollector.collect(localCluster);
-        for (int i = 0; i < times; i++) {
-            Thread.sleep(pollInterval * 1000);
-            metricsCollector.collect(localCluster);
-        }
-        metricsCollector.close();
-        killAndShutdownCluster(localCluster, topologyName);
+    /** Kill topo on Ctrl-C */
+    public static void setupShutdownHook(final String topoName) {
+        Map clusterConf = Utils.readStormConfig();
+        final Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                try {
+                    Helper.kill(client, topoName);
+                    System.out.println("Killed Topology");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
     }
 
-    /** Kill topo and Shutdown local cluster on Ctrl-C */
-  public static void setupShutdownHook(final LocalCluster cluster, final String topoName) {
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-          try {
-              cluster.killTopology(topoName);
-              System.out.println("Killed Topology");
-          } catch (Exception e) {
-              System.err.println("Encountered error in killing topology: " + e);
-          }
-        cluster.shutdown();
-      }
-    });
-  }
-
-  /** Kill topo on Ctrl-C */
-  public static void setupShutdownHook(final String topoName) {
-    Map clusterConf = Utils.readStormConfig();
-    final Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-        try {
-          Helper.kill(client, topoName);
-          System.out.println("Killed Topology");
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
-  }
-
-    public static void runOnClusterAndPrintMetrics(Integer durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception {
-      // submit topology
-      StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
-      setupShutdownHook(topoName); // handle Ctrl-C
+    public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception {
+        // submit topology
+        StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
+        setupShutdownHook(topoName); // handle Ctrl-C
 
-      // poll metrics every minute, then kill topology after specified duration
-      Integer pollIntervalSec = 60;
-      collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
+        // poll metrics every minute, then kill topology after specified duration
+        Integer pollIntervalSec = 60;
+        collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index a934120..fbcea13 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -18,7 +18,9 @@
 
 package org.apache.storm.perf.utils;
 
-import org.apache.storm.LocalCluster;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
@@ -29,9 +31,6 @@ import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.utils.Utils;
 
-import java.util.List;
-import java.util.Map;
-
 public class MetricsSample {
 
     private long sampleTime = -1;
@@ -54,7 +53,7 @@ public class MetricsSample {
     private int totalSlots = 0;
     private int usedSlots = 0;
 
-    public static MetricsSample factory(Nimbus.Client client, String topologyName) throws Exception {
+    public static MetricsSample factory(Nimbus.Iface client, String topologyName) throws Exception {
         // "************ Sampling Metrics *****************
 
         ClusterSummary clusterSummary = client.getClusterInfo();
@@ -72,12 +71,6 @@ public class MetricsSample {
         return sample;
     }
 
-    public static MetricsSample factory(LocalCluster localCluster, String topologyName) throws Exception {
-        TopologyInfo topologyInfo = localCluster.getTopologyInfo(topologyName);;
-        return getMetricsSample(topologyInfo);
-    }
-
-
     private static MetricsSample getMetricsSample(TopologyInfo topInfo) {
         List<ExecutorSummary> executorSummaries = topInfo.get_executors();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-pmml-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-pmml-examples/pom.xml b/examples/storm-pmml-examples/pom.xml
index d202076..a1699d2 100644
--- a/examples/storm-pmml-examples/pom.xml
+++ b/examples/storm-pmml-examples/pom.xml
@@ -34,7 +34,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>
@@ -76,4 +76,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
index 1e5521e..c330f98 100644
--- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
+++ b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
@@ -18,11 +18,15 @@
 
 package org.apache.storm.pmml;
 
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.pmml.model.ModelOutputs;
@@ -36,14 +40,7 @@ import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Utils;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 /**
  * Topology that loads a PMML Model and raw input data from a CSV file. The {@link RawInputFromCSVSpout}
@@ -66,8 +63,7 @@ public class JpmmlRunnerTestTopology {
     private File rawInputs;           // Raw input data to be scored (predicted)
     private File pmml;                // PMML Model read from file - null if using Blobstore
     private String blobKey;           // PMML Model downloaded from Blobstore - null if using File
-    private boolean isLocal;
-    private String tplgyName;
+    private String tplgyName = "test";
 
     public static void main(String[] args) throws Exception {
         try {
@@ -89,7 +85,6 @@ public class JpmmlRunnerTestTopology {
             printUsage();
         } else {
             try {
-                isLocal = true;
                 for (int i = 0; i < args.length; ) {
                     switch (args[i]) {
                         case "-f":
@@ -106,7 +101,6 @@ public class JpmmlRunnerTestTopology {
                             break;
                         default:
                             tplgyName = args[i];
-                            isLocal = false;
                             i++;
                             break;
                     }
@@ -155,11 +149,7 @@ public class JpmmlRunnerTestTopology {
     private void run() throws Exception {
         System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]",
                 blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath()));
-        if (isLocal) {
-            submitTopologyLocalCluster(newTopology(), newConfig());
-        } else {
-            submitTopologyRemoteCluster(newTopology(), newConfig());
-        }
+        submitTopologyRemoteCluster(newTopology(), newConfig());
     }
 
     private StormTopology newTopology() throws Exception {
@@ -171,26 +161,10 @@ public class JpmmlRunnerTestTopology {
         return builder.createTopology();
     }
 
-    private void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(tplgyName, config, topology);
-        stopWaitingForInput();
-    }
-
     private void submitTopologyRemoteCluster(StormTopology topology, Config config) throws Exception {
         StormSubmitter.submitTopology(tplgyName, config, topology);
     }
 
-    private void stopWaitingForInput() {
-        try {
-            System.out.println("PRESS ENTER TO STOP");
-            new BufferedReader(new InputStreamReader(System.in)).readLine();
-            System.exit(0);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
     private Config newConfig() {
         Config config = new Config();
         config.setDebug(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/pom.xml b/examples/storm-redis-examples/pom.xml
index 9317daf..d03d8b7 100644
--- a/examples/storm-redis-examples/pom.xml
+++ b/examples/storm-redis-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
index 17a088b..43798d0 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -17,10 +17,16 @@
  */
 package org.apache.storm.redis.topology;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -30,18 +36,10 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.ITuple;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.bolt.RedisLookupBolt;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import com.google.common.collect.Lists;
 
 public class LookupWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -109,17 +107,14 @@ public class LookupWordCount {
         builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
 
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
+        String topoName = "test";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     private static RedisLookupMapper setupLookupMapper() {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 36cec89..4c7b0dc 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -18,25 +18,14 @@
 package org.apache.storm.redis.topology;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.redis.bolt.AbstractRedisBolt;
 import org.apache.storm.redis.bolt.RedisStoreBolt;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import redis.clients.jedis.exceptions.JedisException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
 
 public class PersistentWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -72,17 +61,14 @@ public class PersistentWordCount {
         builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
         builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
 
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else {
+        String topoName = "test";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     private static RedisStoreMapper setupStoreMapper() {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
index 1cccd9b..4ac410a 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
@@ -17,9 +17,10 @@
  */
 package org.apache.storm.redis.topology;
 
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.redis.bolt.RedisFilterBolt;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
@@ -36,9 +37,6 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Random;
-
 public class WhitelistWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String WHITELIST_BOLT = "WHITELIST_BOLT";
@@ -106,17 +104,14 @@ public class WhitelistWordCount {
         builder.setBolt(COUNT_BOLT, wordCounterBolt, 1).fieldsGrouping(WHITELIST_BOLT, new Fields("word"));
         builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(COUNT_BOLT);
 
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
-            System.out.println("Usage: WhitelistWordCount <redis host> <redis port> (topology name)");
+        String topoName = "test";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
+            System.out.println("Usage: WhitelistWordCount <redis host> <redis port> [topology name]");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     private static RedisFilterMapper setupWhitelistMapper() {