You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:11 UTC

[03/10] storm git commit: STORM-2447: add in storm local to avoid having server on worker classpath

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
index d8137b0..f541cb9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
@@ -17,13 +17,28 @@
  */
 package org.apache.storm.starter;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.HdrHistogram.Histogram;
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-import org.apache.storm.generated.*;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -34,395 +49,328 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.NimbusClient;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.HdrHistogram.Histogram;
-
 /**
  * WordCount but the spout goes at a predefined rate and we collect
  * proper latency statistics.
  */
 public class ThroughputVsLatency {
-  private static class SentWithTime {
-    public final String sentence;
-    public final long time;
+    private static class SentWithTime {
+        public final String sentence;
+        public final long time;
 
-    SentWithTime(String sentence, long time) {
-        this.sentence = sentence;
-        this.time = time;
-    }
-  }
-
-  public static class C {
-    LocalCluster _local = null;
-    Nimbus.Client _client = null;
-
-    public C(Map conf) throws Exception {
-      Map clusterConf = Utils.readStormConfig();
-      if (conf != null) {
-        clusterConf.putAll(conf);
-      }
-      Boolean isLocal = (Boolean)clusterConf.get("run.local");
-      if (isLocal != null && isLocal) {
-        _local = new LocalCluster();
-      } else {
-        _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-      }
+        SentWithTime(String sentence, long time) {
+            this.sentence = sentence;
+            this.time = time;
+        }
     }
 
-    public ClusterSummary getClusterInfo() throws Exception {
-      if (_local != null) {
-        return _local.getClusterInfo();
-      } else {
-        return _client.getClusterInfo();
-      }
-    }
+    public static class FastRandomSentenceSpout extends BaseRichSpout {
+        static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+        SpoutOutputCollector _collector;
+        long _periodNano;
+        long _emitAmount;
+        Random _rand;
+        long _nextEmitTime;
+        long _emitsLeft;
+        HistogramMetric _histo;
+
+        public FastRandomSentenceSpout(long ratePerSecond) {
+            if (ratePerSecond > 0) {
+                _periodNano = Math.max(1, 1000000000/ratePerSecond);
+                _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+            } else {
+                _periodNano = Long.MAX_VALUE - 1;
+                _emitAmount = 1;
+            }
+        }
 
-    public TopologyInfo getTopologyInfo(String id) throws Exception {
-      if (_local != null) {
-        return _local.getTopologyInfo(id);
-      } else {
-        return _client.getTopologyInfo(id);
-      }
-    }
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = ThreadLocalRandom.current();
+            _nextEmitTime = System.nanoTime();
+            _emitsLeft = _emitAmount;
+            _histo = new HistogramMetric(3600000000000L, 3);
+            context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+        }
 
-    public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
-      if (_local != null) {
-        _local.killTopologyWithOpts(name, opts);
-      } else {
-        _client.killTopologyWithOpts(name, opts);
-      }
-    }
+        @Override
+        public void nextTuple() {
+            if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+                _emitsLeft = _emitAmount;
+                _nextEmitTime = _nextEmitTime + _periodNano;
+            }
 
-    public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
-      if (_local != null) {
-        _local.submitTopology(name, stormConf, topology);
-      } else {
-        StormSubmitter.submitTopology(name, stormConf, topology);
-      }
-    }
+            if (_emitsLeft > 0) {
+                String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+                _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+                _emitsLeft--;
+            }
+        }
 
-    public boolean isLocal() {
-      return _local != null;
-    }
-  }
-
-  public static class FastRandomSentenceSpout extends BaseRichSpout {
-    static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
-          "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
-    SpoutOutputCollector _collector;
-    long _periodNano;
-    long _emitAmount;
-    Random _rand;
-    long _nextEmitTime;
-    long _emitsLeft;
-    HistogramMetric _histo;
-
-    public FastRandomSentenceSpout(long ratePerSecond) {
-        if (ratePerSecond > 0) {
-            _periodNano = Math.max(1, 1000000000/ratePerSecond);
-            _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
-        } else {
-            _periodNano = Long.MAX_VALUE - 1;
-            _emitAmount = 1;
+        @Override
+        public void ack(Object id) {
+            long end = System.nanoTime();
+            SentWithTime st = (SentWithTime)id;
+            _histo.recordValue(end-st.time);
         }
-    }
 
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-      _collector = collector;
-      _rand = ThreadLocalRandom.current();
-      _nextEmitTime = System.nanoTime();
-      _emitsLeft = _emitAmount;
-      _histo = new HistogramMetric(3600000000000L, 3);
-      context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
-    }
+        @Override
+        public void fail(Object id) {
+            SentWithTime st = (SentWithTime)id;
+            _collector.emit(new Values(st.sentence), id);
+        }
 
-    @Override
-    public void nextTuple() {
-      if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
-          _emitsLeft = _emitAmount;
-          _nextEmitTime = _nextEmitTime + _periodNano;
-      }
-
-      if (_emitsLeft > 0) {
-          String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
-          _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
-          _emitsLeft--;
-      }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence"));
+        }
     }
 
-    @Override
-    public void ack(Object id) {
-      long end = System.nanoTime();
-      SentWithTime st = (SentWithTime)id;
-      _histo.recordValue(end-st.time);
-    }
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
 
-    @Override
-    public void fail(Object id) {
-      SentWithTime st = (SentWithTime)id;
-      _collector.emit(new Values(st.sentence), id);
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("sentence"));
-    }
-  }
-
-  public static class SplitSentence extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String sentence = tuple.getString(0);
-      for (String word: sentence.split("\\s+")) {
-          collector.emit(new Values(word, 1));
-      }
-    }
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
     }
-  }
 
-  private static class MemMeasure {
-    private long _mem = 0;
-    private long _time = 0;
+    private static class MemMeasure {
+        private long _mem = 0;
+        private long _time = 0;
 
-    public synchronized void update(long mem) {
-        _mem = mem;
-        _time = System.currentTimeMillis();
-    }
+        public synchronized void update(long mem) {
+            _mem = mem;
+            _time = System.currentTimeMillis();
+        }
 
-    public synchronized long get() {
-        return isExpired() ? 0l : _mem;
-    }
+        public synchronized long get() {
+            return isExpired() ? 0l : _mem;
+        }
 
-    public synchronized boolean isExpired() {
-        return (System.currentTimeMillis() - _time) >= 20000;
-    }
-  }
-
-  private static final Histogram _histo = new Histogram(3600000000000L, 3);
-  private static final AtomicLong _systemCPU = new AtomicLong(0);
-  private static final AtomicLong _userCPU = new AtomicLong(0);
-  private static final AtomicLong _gcCount = new AtomicLong(0);
-  private static final AtomicLong _gcMs = new AtomicLong(0);
-  private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
-  private static long readMemory() {
-    long total = 0;
-    for (MemMeasure mem: _memoryBytes.values()) {
-      total += mem.get();
-    }
-    return total;
-  }
-
-  private static long _prev_acked = 0;
-  private static long _prev_uptime = 0;
-
-  public static void printMetrics(C client, String name) throws Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
+        public synchronized boolean isExpired() {
+            return (System.currentTimeMillis() - _time) >= 20000;
+        }
     }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
+
+    private static final Histogram _histo = new Histogram(3600000000000L, 3);
+    private static final AtomicLong _systemCPU = new AtomicLong(0);
+    private static final AtomicLong _userCPU = new AtomicLong(0);
+    private static final AtomicLong _gcCount = new AtomicLong(0);
+    private static final AtomicLong _gcMs = new AtomicLong(0);
+    private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+    private static long readMemory() {
+        long total = 0;
+        for (MemMeasure mem: _memoryBytes.values()) {
+            total += mem.get();
+        }
+        return total;
     }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        if (ackedMap != null) {
-          for (String key: ackedMap.keySet()) {
-            if (failedMap != null) {
-              Long tmp = failedMap.get(key);
-              if (tmp != null) {
-                  failed += tmp;
-              }
+
+    private static long _prev_acked = 0;
+    private static long _prev_uptime = 0;
+
+    public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts: summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
             }
-            long ackVal = ackedMap.get(key);
-            acked += ackVal;
-          }
         }
-      }
-    }
-    long ackedThisTime = acked - _prev_acked;
-    long thisTime = uptime - _prev_uptime;
-    long nnpct, nnnpct, min, max;
-    double mean, stddev;
-    synchronized(_histo) {
-      nnpct = _histo.getValueAtPercentile(99.0);
-      nnnpct = _histo.getValueAtPercentile(99.9);
-      min = _histo.getMinValue();
-      max = _histo.getMaxValue();
-      mean = _histo.getMean();
-      stddev = _histo.getStdDeviation();
-      _histo.reset();
-    }
-    long user = _userCPU.getAndSet(0);
-    long sys = _systemCPU.getAndSet(0);
-    long gc = _gcMs.getAndSet(0);
-    double memMB = readMemory() / (1024.0 * 1024.0);
-    System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
-                      "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
-                      "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
-                       uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
-                       min, max, mean, stddev, user, sys, gc, memMB);
-    _prev_uptime = uptime;
-    _prev_acked = acked;
-  }
-
-  public static void kill(C client, String name) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(name, opts);
-  }
-
-  public static void main(String[] args) throws Exception {
-    long ratePerSecond = 500;
-    if (args != null && args.length > 0) {
-        ratePerSecond = Long.valueOf(args[0]);
+        if (id == null) {
+            throw new Exception("Could not find a topology named "+name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        for (ExecutorSummary exec: info.get_executors()) {
+            if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+                if (ackedMap != null) {
+                    for (String key: ackedMap.keySet()) {
+                        if (failedMap != null) {
+                            Long tmp = failedMap.get(key);
+                            if (tmp != null) {
+                                failed += tmp;
+                            }
+                        }
+                        long ackVal = ackedMap.get(key);
+                        acked += ackVal;
+                    }
+                }
+            }
+        }
+        long ackedThisTime = acked - _prev_acked;
+        long thisTime = uptime - _prev_uptime;
+        long nnpct, nnnpct, min, max;
+        double mean, stddev;
+        synchronized(_histo) {
+            nnpct = _histo.getValueAtPercentile(99.0);
+            nnnpct = _histo.getValueAtPercentile(99.9);
+            min = _histo.getMinValue();
+            max = _histo.getMaxValue();
+            mean = _histo.getMean();
+            stddev = _histo.getStdDeviation();
+            _histo.reset();
+        }
+        long user = _userCPU.getAndSet(0);
+        long sys = _systemCPU.getAndSet(0);
+        long gc = _gcMs.getAndSet(0);
+        double memMB = readMemory() / (1024.0 * 1024.0);
+        System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+                "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+                "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
+                uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+                min, max, mean, stddev, user, sys, gc, memMB);
+        _prev_uptime = uptime;
+        _prev_acked = acked;
     }
 
-    int parallelism = 4;
-    if (args != null && args.length > 1) {
-        parallelism = Integer.valueOf(args[1]);
+    public static void kill(Nimbus.Iface client, String name) throws Exception {
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(name, opts);
     }
 
-    int numMins = 5;
-    if (args != null && args.length > 2) {
-        numMins = Integer.valueOf(args[2]);
-    }
+    public static void main(String[] args) throws Exception {
+        long ratePerSecond = 500;
+        if (args != null && args.length > 0) {
+            ratePerSecond = Long.valueOf(args[0]);
+        }
 
-    String name = "wc-test";
-    if (args != null && args.length > 3) {
-        name = args[3];
-    }
+        int parallelism = 4;
+        if (args != null && args.length > 1) {
+            parallelism = Integer.valueOf(args[1]);
+        }
 
-    Config conf = new Config();
-    HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
-        @Override
-        public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-            String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
-            for (DataPoint dp: dataPoints) {
-                if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
-                    synchronized(_histo) {
-                        _histo.add((Histogram)dp.value);
+        int numMins = 5;
+        if (args != null && args.length > 2) {
+            numMins = Integer.valueOf(args[2]);
+        }
+
+        String name = "wc-test";
+        if (args != null && args.length > 3) {
+            name = args[3];
+        }
+
+        Config conf = new Config();
+        HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+            @Override
+            public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+                String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+                for (DataPoint dp: dataPoints) {
+                    if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+                        synchronized(_histo) {
+                            _histo.add((Histogram)dp.value);
+                        }
+                    } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                        Object sys = m.get("sys-ms");
+                        if (sys instanceof Number) {
+                            _systemCPU.getAndAdd(((Number)sys).longValue());
+                        }
+                        Object user = m.get("user-ms");
+                        if (user instanceof Number) {
+                            _userCPU.getAndAdd(((Number)user).longValue());
+                        }
+                    } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                        Object count = m.get("count");
+                        if (count instanceof Number) {
+                            _gcCount.getAndAdd(((Number)count).longValue());
+                        }
+                        Object time = m.get("timeMs");
+                        if (time instanceof Number) {
+                            _gcMs.getAndAdd(((Number)time).longValue());
+                        }
+                    } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                        Object val = m.get("usedBytes");
+                        if (val instanceof Number) {
+                            MemMeasure mm = _memoryBytes.get(worker);
+                            if (mm == null) {
+                                mm = new MemMeasure();
+                                MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+                                mm = tmp == null ? mm : tmp; 
+                            }
+                            mm.update(((Number)val).longValue());
+                        }
                     }
-                } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
-                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                   Object sys = m.get("sys-ms");
-                   if (sys instanceof Number) {
-                       _systemCPU.getAndAdd(((Number)sys).longValue());
-                   }
-                   Object user = m.get("user-ms");
-                   if (user instanceof Number) {
-                       _userCPU.getAndAdd(((Number)user).longValue());
-                   }
-                } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
-                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                   Object count = m.get("count");
-                   if (count instanceof Number) {
-                       _gcCount.getAndAdd(((Number)count).longValue());
-                   }
-                   Object time = m.get("timeMs");
-                   if (time instanceof Number) {
-                       _gcMs.getAndAdd(((Number)time).longValue());
-                   }
-                } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
-                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                   Object val = m.get("usedBytes");
-                   if (val instanceof Number) {
-                       MemMeasure mm = _memoryBytes.get(worker);
-                       if (mm == null) {
-                         mm = new MemMeasure();
-                         MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
-                         mm = tmp == null ? mm : tmp; 
-                       }
-                       mm.update(((Number)val).longValue());
-                   }
                 }
             }
+        };
+
+        metricServer.serve();
+        String url = metricServer.getUrl();
+
+        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+        conf.setNumWorkers(parallelism);
+        conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+        conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
+        Map<String, String> workerMetrics = new HashMap<String, String>();
+        if (!NimbusClient.isLocalOverride()) {
+            //sigar uses JNI and does not work in local mode
+            workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
         }
-    };
-
-    metricServer.serve();
-    String url = metricServer.getUrl();
-
-    C cluster = new C(conf);
-    conf.setNumWorkers(parallelism);
-    conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
-    conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
-    Map<String, String> workerMetrics = new HashMap<String, String>();
-    if (!cluster.isLocal()) {
-      //sigar uses JNI and does not work in local mode
-      workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
-    }
-    conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
-    conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
-    conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
-      "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
-    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
+        conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+        conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+        conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+                "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+        conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
 
-    TopologyBuilder builder = new TopologyBuilder();
+        TopologyBuilder builder = new TopologyBuilder();
 
-    int numEach = 4 * parallelism;
-    builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+        int numEach = 4 * parallelism;
+        builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
 
-    builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+        builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
 
-    try {
-        cluster.submitTopology(name, conf, builder.createTopology());
+        try {
+            StormSubmitter.submitTopology(name, conf, builder.createTopology());
 
-        for (int i = 0; i < numMins * 2; i++) {
-            Thread.sleep(30 * 1000);
-            printMetrics(cluster, name);
+            for (int i = 0; i < numMins * 2; i++) {
+                Thread.sleep(30 * 1000);
+                printMetrics(client.getClient(), name);
+            }
+        } finally {
+            kill(client.getClient(), name);
         }
-    } finally {
-        kill(cluster, name);
     }
-    System.exit(0);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
index 5a310f6..c1e9555 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
@@ -17,9 +17,14 @@
  */
 package org.apache.storm.starter;
 
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.coordination.BatchOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MemoryTransactionalSpout;
@@ -32,12 +37,7 @@ import org.apache.storm.transactional.TransactionalTopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.utils.NimbusClient;
 
 /**
  * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
@@ -46,128 +46,129 @@ import java.util.Map;
  * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
  */
 public class TransactionalGlobalCount {
-  public static final int PARTITION_TAKE_PER_BATCH = 3;
-  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-    put(0, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("chicken"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-    }});
-    put(1, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-      add(new Values("banana"));
-    }});
-    put(2, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-    }});
-  }};
-
-  public static class Value {
-    int count = 0;
-    BigInteger txid;
-  }
-
-  public static Map<String, Value> DATABASE = new HashMap<String, Value>();
-  public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-
-  public static class BatchCount extends BaseBatchBolt {
-    Object _id;
-    BatchOutputCollector _collector;
-
-    int _count = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-      _collector = collector;
-      _id = id;
+    public static final int PARTITION_TAKE_PER_BATCH = 3;
+    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+        put(0, new ArrayList<List<Object>>() {{
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("chicken"));
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("apple"));
+        }});
+        put(1, new ArrayList<List<Object>>() {{
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("apple"));
+            add(new Values("banana"));
+        }});
+        put(2, new ArrayList<List<Object>>() {{
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("dog"));
+            add(new Values("dog"));
+            add(new Values("dog"));
+        }});
+    }};
+
+    public static class Value {
+        int count = 0;
+        BigInteger txid;
     }
 
-    @Override
-    public void execute(Tuple tuple) {
-      _count++;
-    }
+    public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+    public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
 
-    @Override
-    public void finishBatch() {
-      _collector.emit(new Values(_id, _count));
-    }
+    public static class BatchCount extends BaseBatchBolt {
+        Object _id;
+        BatchOutputCollector _collector;
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "count"));
-    }
-  }
+        int _count = 0;
 
-  public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
-    TransactionAttempt _attempt;
-    BatchOutputCollector _collector;
+        @Override
+        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+            _collector = collector;
+            _id = id;
+        }
 
-    int _sum = 0;
+        @Override
+        public void execute(Tuple tuple) {
+            _count++;
+        }
 
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-      _collector = collector;
-      _attempt = attempt;
-    }
+        @Override
+        public void finishBatch() {
+            _collector.emit(new Values(_id, _count));
+        }
 
-    @Override
-    public void execute(Tuple tuple) {
-      _sum += tuple.getInteger(1);
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "count"));
+        }
     }
 
-    @Override
-    public void finishBatch() {
-      Value val = DATABASE.get(GLOBAL_COUNT_KEY);
-      Value newval;
-      if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
-        newval = new Value();
-        newval.txid = _attempt.getTransactionId();
-        if (val == null) {
-          newval.count = _sum;
+    public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
+        TransactionAttempt _attempt;
+        BatchOutputCollector _collector;
+
+        int _sum = 0;
+
+        @Override
+        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+            _collector = collector;
+            _attempt = attempt;
         }
-        else {
-          newval.count = _sum + val.count;
+
+        @Override
+        public void execute(Tuple tuple) {
+            _sum += tuple.getInteger(1);
         }
-        DATABASE.put(GLOBAL_COUNT_KEY, newval);
-      }
-      else {
-        newval = val;
-      }
-      _collector.emit(new Values(_attempt, newval.count));
-    }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "sum"));
+        @Override
+        public void finishBatch() {
+            Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+            Value newval;
+            if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
+                newval = new Value();
+                newval.txid = _attempt.getTransactionId();
+                if (val == null) {
+                    newval.count = _sum;
+                }
+                else {
+                    newval.count = _sum + val.count;
+                }
+                DATABASE.put(GLOBAL_COUNT_KEY, newval);
+            }
+            else {
+                newval = val;
+            }
+            _collector.emit(new Values(_attempt, newval.count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "sum"));
+        }
     }
-  }
-
-  public static void main(String[] args) throws Exception {
-    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
-    builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
-    builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
-
-    Config config = new Config();
-    config.setDebug(true);
-    config.setMaxSpoutPending(3);
- 
-    try (LocalCluster cluster = new LocalCluster();
-         LocalTopology topo = cluster.submitTopology("global-count-topology", config, builder.buildTopology());) {
-        Thread.sleep(3000);
+
+    public static void main(String[] args) throws Exception {
+        if (!NimbusClient.isLocalOverride()) {
+            throw new IllegalStateException("This example only works in local mode.  "
+                    + "Run with storm local not storm jar");
+        }
+        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+        builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
+        builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
+
+        Config config = new Config();
+        config.setDebug(true);
+        config.setMaxSpoutPending(3);
+
+        StormSubmitter.submitTopology("global-count-topology", config, builder.buildTopology());
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
index 4965565..e922bab 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
@@ -17,9 +17,14 @@
  */
 package org.apache.storm.starter;
 
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.coordination.BatchOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MemoryTransactionalSpout;
@@ -33,12 +38,7 @@ import org.apache.storm.transactional.TransactionalTopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.utils.NimbusClient;
 
 /**
  * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
@@ -51,193 +51,194 @@ import java.util.Map;
  * between buckets as their counts accumulate.
  */
 public class TransactionalWords {
-  public static class CountValue {
-    Integer prev_count = null;
-    int count = 0;
-    BigInteger txid = null;
-  }
-
-  public static class BucketValue {
-    int count = 0;
-    BigInteger txid;
-  }
-
-  public static final int BUCKET_SIZE = 10;
-
-  public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
-  public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-
-
-  public static final int PARTITION_TAKE_PER_BATCH = 3;
-
-  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-    put(0, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("chicken"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-    }});
-    put(1, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-      add(new Values("banana"));
-    }});
-    put(2, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-    }});
-  }};
-
-  public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
-    Map<String, Integer> _counts = new HashMap<String, Integer>();
-    BatchOutputCollector _collector;
-    TransactionAttempt _id;
-
-    int _count = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
-      _collector = collector;
-      _id = id;
+    public static class CountValue {
+        Integer prev_count = null;
+        int count = 0;
+        BigInteger txid = null;
     }
 
-    @Override
-    public void execute(Tuple tuple) {
-      String key = tuple.getString(1);
-      Integer curr = _counts.get(key);
-      if (curr == null)
-        curr = 0;
-      _counts.put(key, curr + 1);
+    public static class BucketValue {
+        int count = 0;
+        BigInteger txid;
     }
 
-    @Override
-    public void finishBatch() {
-      for (String key : _counts.keySet()) {
-        CountValue val = COUNT_DATABASE.get(key);
-        CountValue newVal;
-        if (val == null || !val.txid.equals(_id)) {
-          newVal = new CountValue();
-          newVal.txid = _id.getTransactionId();
-          if (val != null) {
-            newVal.prev_count = val.count;
-            newVal.count = val.count;
-          }
-          newVal.count = newVal.count + _counts.get(key);
-          COUNT_DATABASE.put(key, newVal);
+    public static final int BUCKET_SIZE = 10;
+
+    public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
+    public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
+
+
+    public static final int PARTITION_TAKE_PER_BATCH = 3;
+
+    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+        put(0, new ArrayList<List<Object>>() {{
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("chicken"));
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("apple"));
+        }});
+        put(1, new ArrayList<List<Object>>() {{
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("apple"));
+            add(new Values("banana"));
+        }});
+        put(2, new ArrayList<List<Object>>() {{
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("cat"));
+            add(new Values("dog"));
+            add(new Values("dog"));
+            add(new Values("dog"));
+            add(new Values("dog"));
+        }});
+    }};
+
+    public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
+        Map<String, Integer> _counts = new HashMap<String, Integer>();
+        BatchOutputCollector _collector;
+        TransactionAttempt _id;
+
+        int _count = 0;
+
+        @Override
+        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
+            _collector = collector;
+            _id = id;
         }
-        else {
-          newVal = val;
+
+        @Override
+        public void execute(Tuple tuple) {
+            String key = tuple.getString(1);
+            Integer curr = _counts.get(key);
+            if (curr == null)
+                curr = 0;
+            _counts.put(key, curr + 1);
         }
-        _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
-      }
-    }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "key", "count", "prev-count"));
-    }
-  }
-
-  public static class Bucketize extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
-      int curr = tuple.getInteger(2);
-      Integer prev = tuple.getInteger(3);
-
-      int currBucket = curr / BUCKET_SIZE;
-      Integer prevBucket = null;
-      if (prev != null) {
-        prevBucket = prev / BUCKET_SIZE;
-      }
-
-      if (prevBucket == null) {
-        collector.emit(new Values(attempt, currBucket, 1));
-      }
-      else if (currBucket != prevBucket) {
-        collector.emit(new Values(attempt, currBucket, 1));
-        collector.emit(new Values(attempt, prevBucket, -1));
-      }
+        @Override
+        public void finishBatch() {
+            for (String key : _counts.keySet()) {
+                CountValue val = COUNT_DATABASE.get(key);
+                CountValue newVal;
+                if (val == null || !val.txid.equals(_id)) {
+                    newVal = new CountValue();
+                    newVal.txid = _id.getTransactionId();
+                    if (val != null) {
+                        newVal.prev_count = val.count;
+                        newVal.count = val.count;
+                    }
+                    newVal.count = newVal.count + _counts.get(key);
+                    COUNT_DATABASE.put(key, newVal);
+                }
+                else {
+                    newVal = val;
+                }
+                _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "key", "count", "prev-count"));
+        }
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("attempt", "bucket", "delta"));
+    public static class Bucketize extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
+            int curr = tuple.getInteger(2);
+            Integer prev = tuple.getInteger(3);
+
+            int currBucket = curr / BUCKET_SIZE;
+            Integer prevBucket = null;
+            if (prev != null) {
+                prevBucket = prev / BUCKET_SIZE;
+            }
+
+            if (prevBucket == null) {
+                collector.emit(new Values(attempt, currBucket, 1));
+            }
+            else if (currBucket != prevBucket) {
+                collector.emit(new Values(attempt, currBucket, 1));
+                collector.emit(new Values(attempt, prevBucket, -1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("attempt", "bucket", "delta"));
+        }
     }
-  }
 
-  public static class BucketCountUpdater extends BaseTransactionalBolt {
-    Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
-    BatchOutputCollector _collector;
-    TransactionAttempt _attempt;
+    public static class BucketCountUpdater extends BaseTransactionalBolt {
+        Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
+        BatchOutputCollector _collector;
+        TransactionAttempt _attempt;
 
-    int _count = 0;
+        int _count = 0;
 
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-      _collector = collector;
-      _attempt = attempt;
-    }
+        @Override
+        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+            _collector = collector;
+            _attempt = attempt;
+        }
 
-    @Override
-    public void execute(Tuple tuple) {
-      Integer bucket = tuple.getInteger(1);
-      Integer delta = tuple.getInteger(2);
-      Integer curr = _accum.get(bucket);
-      if (curr == null)
-        curr = 0;
-      _accum.put(bucket, curr + delta);
-    }
+        @Override
+        public void execute(Tuple tuple) {
+            Integer bucket = tuple.getInteger(1);
+            Integer delta = tuple.getInteger(2);
+            Integer curr = _accum.get(bucket);
+            if (curr == null)
+                curr = 0;
+            _accum.put(bucket, curr + delta);
+        }
 
-    @Override
-    public void finishBatch() {
-      for (Integer bucket : _accum.keySet()) {
-        BucketValue currVal = BUCKET_DATABASE.get(bucket);
-        BucketValue newVal;
-        if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
-          newVal = new BucketValue();
-          newVal.txid = _attempt.getTransactionId();
-          newVal.count = _accum.get(bucket);
-          if (currVal != null)
-            newVal.count += currVal.count;
-          BUCKET_DATABASE.put(bucket, newVal);
+        @Override
+        public void finishBatch() {
+            for (Integer bucket : _accum.keySet()) {
+                BucketValue currVal = BUCKET_DATABASE.get(bucket);
+                BucketValue newVal;
+                if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
+                    newVal = new BucketValue();
+                    newVal.txid = _attempt.getTransactionId();
+                    newVal.count = _accum.get(bucket);
+                    if (currVal != null)
+                        newVal.count += currVal.count;
+                    BUCKET_DATABASE.put(bucket, newVal);
+                }
+                else {
+                    newVal = currVal;
+                }
+                _collector.emit(new Values(_attempt, bucket, newVal.count));
+            }
         }
-        else {
-          newVal = currVal;
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("id", "bucket", "count"));
         }
-        _collector.emit(new Values(_attempt, bucket, newVal.count));
-      }
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "bucket", "count"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
-    builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
-    builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
-    builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
-    Config config = new Config();
-    config.setDebug(true);
-    config.setMaxSpoutPending(3);
- 
-    try (LocalCluster cluster = new LocalCluster();
-         LocalTopology topo = cluster.submitTopology("top-n-topology", config, builder.buildTopology());) {
-      Thread.sleep(3000);
+    public static void main(String[] args) throws Exception {
+        if (!NimbusClient.isLocalOverride()) {
+            throw new IllegalStateException("This example only works in local mode.  "
+                    + "Run with storm local not storm jar");
+        }
+        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+        builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
+        builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+        builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
+        Config config = new Config();
+        config.setDebug(true);
+        config.setMaxSpoutPending(3);
+
+        StormSubmitter.submitTopology("top-n-topology", config, builder.buildTopology());
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index 576fcd9..59e3e4b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -78,7 +78,7 @@ public class WordCountTopology extends ConfigurableTopology {
     ConfigurableTopology.start(new WordCountTopology(), args);
   }
 
-  protected int run(String[] args) {
+  protected int run(String[] args) throws Exception {
 
     TopologyBuilder builder = new TopologyBuilder();
 
@@ -91,17 +91,11 @@ public class WordCountTopology extends ConfigurableTopology {
 
     String topologyName = "word-count";
 
-    if (isLocal) {
-      conf.setMaxTaskParallelism(3);
-      ttl = 10;
-    } else {
-      conf.setNumWorkers(3);
-    }
+    conf.setNumWorkers(3);
 
     if (args != null && args.length > 0) {
       topologyName = args[0];
     }
-
     return submit(topologyName, conf, builder);
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
index 45014fc..39f5932 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -17,41 +17,43 @@
  */
 package org.apache.storm.starter;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.ShellSpout;
 import org.apache.storm.task.ShellBolt;
-import org.apache.storm.topology.*;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * This topology demonstrates Storm's stream groupings and multilang capabilities.
  */
 public class WordCountTopologyNode {
-  public static class SplitSentence extends ShellBolt implements IRichBolt {
+    public static class SplitSentence extends ShellBolt implements IRichBolt {
 
-    public SplitSentence() {
-      super("node", "splitsentence.js");
-    }
+        public SplitSentence() {
+            super("node", "splitsentence.js");
+        }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
 
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-      return null;
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
     }
-  }
 
     public static class RandomSentence extends ShellSpout implements IRichSpout {
 
@@ -70,51 +72,42 @@ public class WordCountTopologyNode {
         }
     }
 
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
 
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("spout", new RandomSentence(), 5);
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
 
-    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
 
-    Config conf = new Config();
-    conf.setDebug(true);
+    public static void main(String[] args) throws Exception {
 
+        TopologyBuilder builder = new TopologyBuilder();
 
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
+        builder.setSpout("spout", new RandomSentence(), 5);
 
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-      conf.setMaxTaskParallelism(3);
+        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
 
-      try (LocalCluster cluster = new LocalCluster();
-           LocalTopology topo = cluster.submitTopology("word-count", conf, builder.createTopology());) {
-          Thread.sleep(10000);
-      }
+        Config conf = new Config();
+        conf.setDebug(true);
+        String topoName = "word-count";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
index cbc5d45..dae774b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
@@ -18,7 +18,6 @@
 package org.apache.storm.starter.streams;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
 import org.apache.storm.streams.Pair;
@@ -27,7 +26,6 @@ import org.apache.storm.streams.operations.CombinerAggregator;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.utils.Utils;
 
 /**
  * An example that illustrates the global aggregate
@@ -48,15 +46,12 @@ public class AggregateExample {
                 .print();
 
         Config config = new Config();
+        String topoName = "AGG_EXAMPLE";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 
     private static class Avg implements CombinerAggregator<Integer, Pair<Integer, Integer>, Double> {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
index 027b432..f8d865f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
@@ -18,14 +18,12 @@
 package org.apache.storm.starter.streams;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
 import org.apache.storm.streams.Stream;
 import org.apache.storm.streams.StreamBuilder;
 import org.apache.storm.streams.operations.Predicate;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,15 +55,13 @@ public class BranchExample {
         evenAndOdd[1].forEach(x -> LOG.info("ODD > " + x));
 
         Config config = new Config();
+        String topoName = "branchExample";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+         
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
index dd7e97f..d989433 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
@@ -17,8 +17,11 @@
  */
 package org.apache.storm.starter.streams;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.streams.PairStream;
@@ -35,10 +38,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
 /**
  * An example that shows the usage of {@link PairStream#groupByKeyAndWindow(Window)}
  * and {@link PairStream#reduceByKeyAndWindow(Reducer, Window)}
@@ -72,15 +71,12 @@ public class GroupByKeyAndWindowExample {
                 .print();
 
         Config config = new Config();
+        String topoName = GroupByKeyAndWindowExample.class.getName();
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 
     private static class StockQuotes extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
index 4aa6253..ae8be3d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
@@ -17,8 +17,9 @@
  */
 package org.apache.storm.starter.streams;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.streams.PairStream;
@@ -29,14 +30,11 @@ import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
-
 /**
  * An example that demonstrates the usage of {@link PairStream#join(PairStream)} to join
  * multiple streams.
@@ -67,16 +65,12 @@ public class JoinExample {
                 .print();
 
         Config config = new Config();
+        String topoName = JoinExample.class.getName();
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
-
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 
     private static class NumberSpout extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
index ab6cac3..d7eb16b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
@@ -17,8 +17,9 @@
  */
 package org.apache.storm.starter.streams;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.streams.Pair;
@@ -34,8 +35,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 
-import java.util.Map;
-
 /**
  * An example that uses {@link Stream#stateQuery(StreamState)} to query the state
  * <p>
@@ -79,16 +78,12 @@ public class StateQueryExample {
         Config config = new Config();
         // use redis based state store for persistence
         config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
-
+        String topoName = "test";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 
     private static class QuerySpout extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
index ddd318a..aba19f3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.storm.starter.streams;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.streams.PairStream;
@@ -28,7 +27,6 @@ import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.utils.Utils;
 
 /**
  * A stateful word count that uses {@link PairStream#updateStateByKey(StateUpdater)} to
@@ -74,15 +72,11 @@ public class StatefulWordCount {
         Config config = new Config();
         // use redis based state store for persistence
         config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
-
+        String topoName = "test";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
index 11e89bf..a74151b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
@@ -18,7 +18,6 @@
 package org.apache.storm.starter.streams;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomIntegerSpout;
 import org.apache.storm.streams.Pair;
@@ -28,15 +27,12 @@ import org.apache.storm.streams.StreamBuilder;
 import org.apache.storm.streams.operations.mappers.TupleValueMappers;
 import org.apache.storm.streams.tuple.Tuple3;
 import org.apache.storm.streams.windowing.TumblingWindows;
-import org.apache.storm.utils.Utils;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
 
 /**
  * An example that illustrates the usage of typed tuples (TupleN<..>) and {@link TupleValueMappers}.
  */
 public class TypedTupleExample {
-    @SuppressWarnings("unchecked")
     public static void main(String[] args) throws Exception {
         StreamBuilder builder = new StreamBuilder();
         /**
@@ -49,15 +45,12 @@ public class TypedTupleExample {
 
         pairs.window(TumblingWindows.of(Count.of(10))).groupByKey().print();
 
-        Config config = new Config();
+        String topoName = "test";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        Config config = new Config();
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
index 0f30b7c..50c8aad 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
@@ -17,19 +17,16 @@
  */
 package org.apache.storm.starter.streams;
 
+import java.util.Arrays;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomSentenceSpout;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.streams.StreamBuilder;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.streams.windowing.TumblingWindows;
-import org.apache.storm.utils.Utils;
-
-import java.util.Arrays;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
 
 /**
  * A windowed word count example
@@ -66,14 +63,11 @@ public class WindowedWordCount {
                 .print();
 
         Config config = new Config();
+        String topoName = "test";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
index 1c0aae1..360f0ad 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
@@ -18,7 +18,6 @@
 package org.apache.storm.starter.streams;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.redis.bolt.RedisStoreBolt;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
@@ -30,7 +29,6 @@ import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.tuple.ITuple;
-import org.apache.storm.utils.Utils;
 
 /**
  * An example that computes word counts and finally emits the results to an
@@ -67,15 +65,12 @@ public class WordCountToBolt {
                 .to(redisStoreBolt);
 
         Config config = new Config();
+        String topoName = "test";
         if (args.length > 0) {
-            config.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
-        } else {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
-                Utils.sleep(60_000);
-            }
+            topoName = args[0];
         }
+        config.setNumWorkers(1);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
     }
 
     // Maps a storm tuple to redis key and value

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 70a23b8..fa9274d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -18,9 +18,9 @@
  */
 package org.apache.storm.starter.trident;
 
+import java.util.HashMap;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
@@ -35,12 +35,9 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
 import org.apache.storm.trident.windowing.config.TumblingCountWindow;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-
 /**
  * Sample application of trident windowing which uses {@link HBaseWindowsStoreFactory}'s store for storing tuples in window.
  *
@@ -76,17 +73,12 @@ public class TridentHBaseWindowingStoreTopology {
 
         // window-state table should already be created with cf:tuples column
         HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
-
-        if (args.length == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("wordCounterWithWindowing", conf, buildTopology(windowStoreFactory));) {
-                Utils.sleep(120 * 1000);
-            }
-            System.exit(0);
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory));
+        String topoName = "wordCounterWithWindowing";
+        if (args.length > 0) {
+            topoName = args[0];
         }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(windowStoreFactory));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index 5ddace8..c5f73ff 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -17,10 +17,10 @@
  */
 package org.apache.storm.starter.trident;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
@@ -39,9 +39,7 @@ import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.storm.utils.DRPCClient;
 
 /**
  * A simple example that demonstrates the usage of {@link org.apache.storm.trident.Stream#map(MapFunction)} and
@@ -74,7 +72,7 @@ public class TridentMapExample {
         }
     };
 
-    public static StormTopology buildTopology(LocalDRPC drpc) {
+    public static StormTopology buildTopology() {
         FixedBatchSpout spout = new FixedBatchSpout(
                 new Fields("word"), 3, new Values("the cow jumped over the moon"),
                 new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
@@ -96,7 +94,7 @@ public class TridentMapExample {
                 .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                 .parallelismHint(16);
 
-        topology.newDRPCStream("words", drpc)
+        topology.newDRPCStream("words")
                 .flatMap(split, new Fields("word"))
                 .groupBy(new Fields("word"))
                 .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
@@ -108,18 +106,17 @@ public class TridentMapExample {
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
-        if (args.length == 0) {
-            try (LocalDRPC drpc = new LocalDRPC();
-                 LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
-                for (int i = 0; i < 100; i++) {
-                    System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
-                    Thread.sleep(1000);
-                }
+        String topoName = "wordCounter";
+        if (args.length > 0) {
+            topoName = args[0];
+        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology());
+        try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+            for (int i = 0; i < 10; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
+                Thread.sleep(1000);
             }
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
index e5a775b..1cc33a9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -17,9 +17,12 @@
  */
 package org.apache.storm.starter.trident;
 
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
@@ -30,12 +33,6 @@ import org.apache.storm.trident.testing.FixedBatchSpout;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * This class demonstrates different usages of
@@ -113,16 +110,8 @@ public class TridentMinMaxOfDevicesTopology {
         StormTopology topology = buildDevicesTopology();
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
-        if (args.length == 0) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("devices-topology", conf, topology);) {
-                Utils.sleep(60 * 1000);
-            }
-            System.exit(0);
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology);
-        }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology);
     }
 
     static class SpeedComparator implements Comparator<TridentTuple>, Serializable {