You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:14 UTC

[06/10] storm git commit: STORM-2447: add in storm local to avoid having server on worker classpath

STORM-2447: add in storm local to avoid having server on worker classpath


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b254ede4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b254ede4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b254ede4

Branch: refs/heads/master
Commit: b254ede46a25466749cd48ebd4bcb56dd791ec4a
Parents: 4eb6507
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 6 13:58:41 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 6 13:58:41 2017 -0500

----------------------------------------------------------------------
 bin/storm.py                                    | 143 ++++-
 docs/Clojure-DSL.md                             |   2 +-
 docs/Distributed-RPC.md                         |  27 +-
 docs/Local-mode.md                              |  65 +-
 docs/Tutorial.md                                |  31 +-
 docs/flux.md                                    |  33 +-
 docs/storm-hbase.md                             |  16 +-
 examples/storm-elasticsearch-examples/pom.xml   |   2 +-
 .../elasticsearch/bolt/EsIndexTopology.java     |  26 +-
 .../trident/TridentEsTopology.java              |  23 +-
 examples/storm-hbase-examples/pom.xml           |   2 +-
 .../storm/hbase/topology/LookupWordCount.java   |  27 +-
 .../hbase/topology/PersistentWordCount.java     |  33 +-
 .../storm/hbase/trident/WordCountTrident.java   |  27 +-
 examples/storm-hdfs-examples/pom.xml            |   2 +-
 .../storm/hdfs/bolt/HdfsFileTopology.java       |  52 +-
 .../storm/hdfs/bolt/SequenceFileTopology.java   |  51 +-
 .../storm/hdfs/spout/HdfsSpoutTopology.java     |   6 +-
 .../storm/hdfs/trident/TridentFileTopology.java |  37 +-
 .../hdfs/trident/TridentSequenceTopology.java   |  38 +-
 examples/storm-hive-examples/pom.xml            |   2 +-
 .../storm/hive/bolt/BucketTestHiveTopology.java |  44 +-
 .../apache/storm/hive/bolt/HiveTopology.java    |  30 +-
 .../hive/bolt/HiveTopologyPartitioned.java      |  29 +-
 .../storm/hive/trident/TridentHiveTopology.java |  70 +-
 examples/storm-jdbc-examples/pom.xml            |   2 +-
 .../jdbc/topology/AbstractUserTopology.java     |  31 +-
 examples/storm-jms-examples/pom.xml             |  37 +-
 .../storm/jms/example/ExampleJmsTopology.java   |  23 +-
 examples/storm-kafka-client-examples/pom.xml    |   4 +-
 .../TridentKafkaClientWordCountNamedTopics.java |  70 +-
 examples/storm-kafka-examples/pom.xml           |   2 +-
 .../storm/kafka/trident/DrpcResultsPrinter.java |  18 +-
 .../storm/kafka/trident/LocalSubmitter.java     | 106 ---
 .../trident/TridentKafkaConsumerTopology.java   |  17 +-
 .../kafka/trident/TridentKafkaTopology.java     |  13 +-
 .../kafka/trident/TridentKafkaWordCount.java    |  48 +-
 examples/storm-mongodb-examples/pom.xml         |   2 +-
 .../storm/mongodb/topology/InsertWordCount.java |  25 +-
 .../storm/mongodb/topology/LookupWordCount.java |  18 +-
 .../storm/mongodb/topology/UpdateWordCount.java |  24 +-
 .../storm/mongodb/trident/WordCountTrident.java |  20 +-
 .../mongodb/trident/WordCountTridentMap.java    |  22 +-
 examples/storm-mqtt-examples/pom.xml            |   2 +-
 examples/storm-opentsdb-examples/pom.xml        |   2 +-
 .../opentsdb/SampleOpenTsdbBoltTopology.java    |  27 +-
 .../opentsdb/SampleOpenTsdbTridentTopology.java |  26 +-
 examples/storm-perf/pom.xml                     |   2 +-
 .../perf/ConstSpoutIdBoltNullBoltTopo.java      |  37 +-
 .../storm/perf/ConstSpoutNullBoltTopo.java      |  42 +-
 .../apache/storm/perf/ConstSpoutOnlyTopo.java   |  32 +-
 .../storm/perf/FileReadWordCountTopo.java       |  42 +-
 .../storm/perf/StrGenSpoutHdfsBoltTopo.java     |  45 +-
 .../storm/perf/utils/BasicMetricsCollector.java |  49 +-
 .../org/apache/storm/perf/utils/Helper.java     | 134 ++--
 .../apache/storm/perf/utils/MetricsSample.java  |  15 +-
 examples/storm-pmml-examples/pom.xml            |   4 +-
 .../storm/pmml/JpmmlRunnerTestTopology.java     |  44 +-
 examples/storm-redis-examples/pom.xml           |   2 +-
 .../storm/redis/topology/LookupWordCount.java   |  35 +-
 .../redis/topology/PersistentWordCount.java     |  32 +-
 .../redis/topology/WhitelistWordCount.java      |  25 +-
 .../redis/trident/WordCountTridentRedis.java    |  31 +-
 .../trident/WordCountTridentRedisCluster.java   |  37 +-
 .../WordCountTridentRedisClusterMap.java        |  38 +-
 .../redis/trident/WordCountTridentRedisMap.java |  31 +-
 examples/storm-solr-examples/pom.xml            |   2 +-
 .../storm/solr/topology/SolrTopology.java       |  23 +-
 examples/storm-starter/pom.xml                  |  27 +-
 .../apache/storm/starter/clj/exclamation.clj    |   9 +-
 .../storm/starter/clj/rolling_top_words.clj     |   9 +-
 .../org/apache/storm/starter/clj/word_count.clj |   9 +-
 .../apache/storm/starter/BasicDRPCTopology.java |  61 +-
 .../storm/starter/ExclamationTopology.java      |   6 +-
 .../storm/starter/FastWordCountTopology.java    |   6 +-
 .../storm/starter/InOrderDeliveryTest.java      |   6 +-
 .../apache/storm/starter/JoinBoltExample.java   |  20 +-
 .../org/apache/storm/starter/ManualDRPC.java    |  25 +-
 .../storm/starter/MultipleLoggerTopology.java   |  20 +-
 .../org/apache/storm/starter/ReachTopology.java | 239 ++++---
 .../starter/ResourceAwareExampleTopology.java   |  20 +-
 .../apache/storm/starter/RollingTopWords.java   |   9 -
 .../apache/storm/starter/SingleJoinExample.java |  60 +-
 .../storm/starter/SkewedRollingTopWords.java    |   9 -
 .../storm/starter/SlidingTupleTsTopology.java   |  28 +-
 .../storm/starter/SlidingWindowTopology.java    |  29 +-
 .../apache/storm/starter/StatefulTopology.java  |  19 +-
 .../starter/StatefulWindowingTopology.java      |  22 +-
 .../storm/starter/ThroughputVsLatency.java      | 638 +++++++++----------
 .../storm/starter/TransactionalGlobalCount.java | 235 +++----
 .../storm/starter/TransactionalWords.java       | 355 ++++++-----
 .../apache/storm/starter/WordCountTopology.java |  10 +-
 .../storm/starter/WordCountTopologyNode.java    | 107 ++--
 .../storm/starter/streams/AggregateExample.java |  13 +-
 .../storm/starter/streams/BranchExample.java    |  14 +-
 .../streams/GroupByKeyAndWindowExample.java     |  20 +-
 .../storm/starter/streams/JoinExample.java      |  20 +-
 .../starter/streams/StateQueryExample.java      |  17 +-
 .../starter/streams/StatefulWordCount.java      |  14 +-
 .../starter/streams/TypedTupleExample.java      |  19 +-
 .../starter/streams/WindowedWordCount.java      |  20 +-
 .../storm/starter/streams/WordCountToBolt.java  |  13 +-
 .../TridentHBaseWindowingStoreTopology.java     |  22 +-
 .../starter/trident/TridentMapExample.java      |  35 +-
 .../trident/TridentMinMaxOfDevicesTopology.java |  25 +-
 .../TridentMinMaxOfVehiclesTopology.java        |  25 +-
 .../storm/starter/trident/TridentReach.java     | 190 +++---
 .../TridentWindowingInmemoryStoreTopology.java  |  36 +-
 .../storm/starter/trident/TridentWordCount.java |  82 ++-
 external/storm-eventhubs/pom.xml                |   7 -
 .../storm/eventhubs/samples/EventCount.java     |  39 +-
 .../main/java/org/apache/storm/flux/Flux.java   |  72 +--
 flux/pom.xml                                    |   9 +-
 pom.xml                                         |   3 +-
 .../jvm/org/apache/storm/drpc/DRPCSpout.java    |  10 +-
 .../storm/security/auth/ThriftClient.java       |   6 +-
 .../security/auth/ThriftConnectionType.java     |  29 +-
 .../storm/topology/ConfigurableTopology.java    | 151 +++++
 .../jvm/org/apache/storm/utils/DRPCClient.java  |  78 ++-
 .../org/apache/storm/utils/NimbusClient.java    |  45 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   2 +-
 storm-clojure-test/pom.xml                      |  66 ++
 .../src/clj/org/apache/storm/testing.clj        | 270 ++++++++
 storm-clojure/pom.xml                           |  15 +-
 .../src/clj/org/apache/storm/config.clj         |  28 +
 storm-clojure/src/clj/org/apache/storm/log.clj  |  34 +
 .../src/clj/org/apache/storm/testing.clj        | 270 --------
 storm-clojure/src/clj/org/apache/storm/util.clj | 134 ++++
 storm-core/pom.xml                              |   2 +-
 .../jvm/org/apache/storm/command/Activate.java  |   2 +-
 .../org/apache/storm/command/Deactivate.java    |   2 +-
 .../jvm/org/apache/storm/command/GetErrors.java |   2 +-
 .../org/apache/storm/command/KillTopology.java  |   2 +-
 .../apache/storm/command/ListTopologies.java    |   2 +-
 .../jvm/org/apache/storm/command/Monitor.java   |   2 +-
 .../jvm/org/apache/storm/command/Rebalance.java |   2 +-
 .../org/apache/storm/command/SetLogLevel.java   |   2 +-
 .../src/jvm/org/apache/storm/utils/Monitor.java |   6 +-
 .../java/org/apache/storm/LocalCluster.java     | 296 ++++++++-
 .../storm/topology/ConfigurableTopology.java    | 184 ------
 140 files changed, 3089 insertions(+), 3285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 41cd8f4..257dcdd 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -128,9 +128,12 @@ def get_jars_full(adir):
             ret.append(os.path.join(adir, f))
     return ret
 
-def get_classpath(extrajars, daemon=True):
+def get_classpath(extrajars, daemon=True, client=False):
     ret = get_jars_full(STORM_DIR)
-    ret.extend(get_jars_full(STORM_LIB_DIR))
+    if client:
+        ret.extend(get_jars_full(STORM_WORKER_LIB_DIR))
+    else :
+        ret.extend(get_jars_full(STORM_LIB_DIR))
     ret.extend(get_jars_full(os.path.join(STORM_DIR, "extlib")))
     if daemon:
         ret.extend(get_jars_full(os.path.join(STORM_DIR, "extlib-daemon")))
@@ -235,7 +238,7 @@ def parse_args(string):
     args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
     return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
 
-def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""):
+def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName=""):
     global CONFFILE
     storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
     if(storm_log_dir == None or storm_log_dir == "null"):
@@ -248,7 +251,7 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[]
         "-Dstorm.log.dir=" + storm_log_dir,
         "-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon),
         "-Dstorm.conf.file=" + CONFFILE,
-        "-cp", get_classpath(extrajars, daemon),
+        "-cp", get_classpath(extrajars, daemon, client=client),
     ] + jvmopts + [klass] + list(args)
     print("Running: " + " ".join(all_args))
     sys.stdout.flush()
@@ -266,30 +269,7 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[]
         os.execvp(JAVA_CMD, all_args)
     return exit_code
 
-def jar(jarfile, klass, *args):
-    """Syntax: [storm jar topology-jar-path class ...]
-
-    Runs the main method of class with the specified arguments.
-    The storm jars and configs in ~/.storm are put on the classpath.
-    The process is configured so that StormSubmitter
-    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
-    will upload the jar at topology-jar-path when the topology is submitted.
-
-    When you want to ship other jars which is not included to application jar, you can pass them to --jars option with comma-separated string.
-    For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load your-local-jar.jar and your-local-jar2.jar.
-    And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
-    You can also exclude some dependencies like what you're doing in maven pom.
-    Please add exclusion artifacts with '^' separated string after the artifact.
-    For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka.
-
-    When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string.
-    Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
-    For example, --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" will add JBoss and HDP repositories for dependency resolver.
-
-    Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
-
-    When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
-    """
+def run_client_jar(jarfile, klass, args, daemon=False, client=True, extrajvmopts=[]):
     global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS
 
     local_jars = DEP_JARS_OPTS
@@ -307,9 +287,10 @@ def jar(jarfile, klass, *args):
                 jvmtype="-client",
                 extrajars=extra_jars,
                 args=args,
-                daemon=False,
+                daemon=daemon,
+                client=client,
                 fork=True,
-                jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + tmpjar] +
+                jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + tmpjar] +
                         ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
                         ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
         os.remove(tmpjar)
@@ -324,10 +305,63 @@ def jar(jarfile, klass, *args):
             extrajars=extra_jars,
             args=args,
             daemon=False,
-            jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile] +
+            jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
                     ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
                     ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
 
+def local(jarfile, klass, *args):
+    """Syntax: [storm local topology-jar-path class ...]
+
+    Runs the main method of class with the specified arguments but pointing to a local cluster
+    The storm jars and configs in ~/.storm are put on the classpath.
+    The process is configured so that StormSubmitter
+    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
+    and others will interact with a local cluster instead of the one configured by default.
+
+    Most options should work just like with the storm jar command.
+
+    local also adds in the option --local-ttl which sets the number of seconds the
+    local cluster will run for before it shuts down.
+
+    --java-debug lets you turn on java debugging and set the parameters passed to -agentlib:jdwp on the JDK
+    --java-debug transport=dt_socket,address=localhost:8000
+    will open up a debugging server on port 8000.
+    """
+    [ttl, debug_args, args] = parse_local_opts(args)
+    extrajvmopts = ["-Dstorm.local.sleeptime=" + ttl]
+    if debug_args != None:
+        extrajvmopts = extrajvmopts + ["-agentlib:jdwp=" + debug_args]
+    run_client_jar(jarfile, "org.apache.storm.LocalCluster", [klass] + list(args), client=False, daemon=False, extrajvmopts=extrajvmopts)
+
+def jar(jarfile, klass, *args):
+    """Syntax: [storm jar topology-jar-path class ...]
+
+    Runs the main method of class with the specified arguments.
+    The storm worker dependencies and configs in ~/.storm are put on the classpath.
+    The process is configured so that StormSubmitter
+    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
+    will upload the jar at topology-jar-path when the topology is submitted.
+
+    When you want to ship other jars which is not included to application jar, you can pass them to --jars option with comma-separated string.
+    For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load your-local-jar.jar and your-local-jar2.jar.
+    And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
+    You can also exclude some dependencies like what you're doing in maven pom.
+    Please add exclusion artifacts with '^' separated string after the artifact.
+    For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka.
+
+    When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string.
+    Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
+    For example, --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" will add JBoss and HDP repositories for dependency resolver.
+
+    Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
+
+    When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
+
+    If for some reason you need to have the full storm classpath, not just the one for the worker you may include the command line option `--storm-server-classpath`.  Please be careful because this will add things to the classpath that will not be on the worker classpath and could result in the worker not running.
+    """
+    [server_class_path, args] = parse_jar_opts(args) 
+    run_client_jar(jarfile, klass, list(args), client=not server_class_path, daemon=False)
+
 def sql(sql_file, topology_name):
     """Syntax: [storm sql sql-file topology-name], or [storm sql sql-file --explain] when activating explain mode
 
@@ -802,7 +836,14 @@ def print_classpath():
 
     Prints the classpath used by the storm client when running commands.
     """
-    print(get_classpath([]))
+    print(get_classpath([], client=True))
+
+def print_server_classpath():
+    """Syntax: [storm server_classpath]
+
+    Prints the classpath used by the storm servers when running commands.
+    """
+    print(get_classpath([], daemon=True))
 
 def monitor(*args):
     """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]]
@@ -845,9 +886,9 @@ def unknown_command(*args):
     print_usage()
     sys.exit(254)
 
-COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer,
+COMMANDS = {"local": local, "jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer,
             "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
-            "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
+            "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, "server_classpath": print_server_classpath,
             "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
             "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor,
             "upload-credentials": upload_credentials, "pacemaker": pacemaker, "heartbeats": heartbeats, "blobstore": blobstore,
@@ -860,6 +901,40 @@ def parse_config(config_list):
         for config in config_list:
             CONFIG_OPTS.append(config)
 
+def parse_local_opts(args):
+    curr = list(args[:])
+    curr.reverse()
+    ttl = "20"
+    debug_args = None
+    args_list = []
+
+    while len(curr) > 0:
+        token = curr.pop()
+        if token == "--local-ttl":
+            ttl = curr.pop()
+        elif token == "--java-debug":
+            debug_args = curr.pop()
+        else:
+            args_list.append(token)
+
+    return ttl, debug_args, args_list
+
+
+def parse_jar_opts(args):
+    curr = list(args[:])
+    curr.reverse()
+    server_class_path = False
+    args_list = []
+
+    while len(curr) > 0:
+        token = curr.pop()
+        if token == "--storm-server-classpath":
+            server_class_path = True
+        else:
+            args_list.append(token)
+
+    return server_class_path, args_list
+
 def parse_config_opts(args):
     curr = args[:]
     curr.reverse()

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/docs/Clojure-DSL.md
----------------------------------------------------------------------
diff --git a/docs/Clojure-DSL.md b/docs/Clojure-DSL.md
index 1aa3393..56bb54f 100644
--- a/docs/Clojure-DSL.md
+++ b/docs/Clojure-DSL.md
@@ -252,7 +252,7 @@ The following example illustrates how to use this spout in a `spout-spec`:
 
 ### Running topologies in local mode or on a cluster
 
-That's all there is to the Clojure DSL. To submit topologies in remote mode or local mode, just use the `StormSubmitter` or `LocalCluster` classes just like you would from Java.
+That's all there is to the Clojure DSL. To submit topologies in remote mode or local mode, just use the `StormSubmitter` class just like you would from Java.
 
 To create topology configs, it's easiest to use the `org.apache.storm.config` namespace which defines constants for all of the possible configs. The constants are the same as the static constants in the `Config` class, except with dashes instead of underscores. For example, here's a topology config that sets the number of workers to 15 and configures the topology in debug mode:
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/docs/Distributed-RPC.md
----------------------------------------------------------------------
diff --git a/docs/Distributed-RPC.md b/docs/Distributed-RPC.md
index b20419a..3547934 100644
--- a/docs/Distributed-RPC.md
+++ b/docs/Distributed-RPC.md
@@ -16,6 +16,13 @@ DRPCClient client = new DRPCClient("drpc-host", 3772);
 String result = client.execute("reach", "http://twitter.com");
 ```
 
+or if you just want to use a preconfigured client you can call.  The exact host will be selected randomly from the configured set of hosts
+
+```java
+DRPCClient client = DRPCClient.getConfiguredClient(conf);
+String result = client.execute("reach", "http://twitter.com");
+```
+
 The distributed RPC workflow looks like this:
 
 ![Tasks in a topology](images/drpc-workflow.png)
@@ -57,23 +64,9 @@ In this example, `ExclaimBolt` simply appends a "!" to the second field of the t
 
 ### Local mode DRPC
 
-DRPC can be run in local mode. Here's how to run the above example in local mode:
-
-```java
-LocalDRPC drpc = new LocalDRPC();
-LocalCluster cluster = new LocalCluster();
-
-cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
-
-System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
-
-cluster.shutdown();
-drpc.shutdown();
-```
-
-First you create a `LocalDRPC` object. This object simulates a DRPC server in process, just like how `LocalCluster` simulates a Storm cluster in process. Then you create the `LocalCluster` to run the topology in local mode. `LinearDRPCTopologyBuilder` has separate methods for creating local topologies and remote topologies. In local mode the `LocalDRPC` object does not bind to any ports so the topology needs to know about the object to communicate with it. This is why `createLocalTopology` takes in the `LocalDRPC` object as input.
-
-After launching the topology, you can do DRPC invocations using the `execute` method on `LocalDRPC`.
+In the past to use DRPC in local mode it took creating a special LocalDRPC instance.  This can still be used when writing tests for your code, but in the current version of storm when you run in local mode a LocalDRPC
+instance is also created, and any DRPCClient created will link to it instead of the outside world.  This means that any interaction you want to test needs to be a part of the script that launches the topology, just like
+with LocalDRPC.
 
 ### Remote mode DRPC
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/docs/Local-mode.md
----------------------------------------------------------------------
diff --git a/docs/Local-mode.md b/docs/Local-mode.md
index e3d9666..a9e3a28 100644
--- a/docs/Local-mode.md
+++ b/docs/Local-mode.md
@@ -3,27 +3,82 @@ title: Local Mode
 layout: documentation
 documentation: true
 ---
-Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies [on a cluster](Running-topologies-on-a-production-cluster.html). 
+Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies [on a cluster](Running-topologies-on-a-production-cluster.html).
+
+To run a topology in local mode you have two options.  The most common option is to run your topology with `storm local` instead of `storm jar`
+
+This will bring up a local simulated cluster and force all interactions with nimbus to go through the simulated cluster instead of going to a separate process.
+
+If you want to do some automated testing but without actually launching a storm cluster you can use the same classes internally that `storm local` does.
+
+To do this you first need to pull in the dependencies needed to access these classes.  For the java API you should depend on `storm-server` as a `test` dependency.
 
 To create an in-process cluster, simply use the `LocalCluster` class. For example:
 
 ```java
 import org.apache.storm.LocalCluster;
 
-LocalCluster cluster = new LocalCluster();
+...
+
+try (LocalCluster cluster = new LocalCluster()) {
+    //Interact with the cluster...
+}
 ```
 
 You can then submit topologies using the `submitTopology` method on the `LocalCluster` object. Just like the corresponding method on [StormSubmitter](javadocs/org/apache/storm/StormSubmitter.html), `submitTopology` takes a name, a topology configuration, and the topology object. You can then kill a topology using the `killTopology` method which takes the topology name as an argument.
 
-To shutdown a local cluster, simple call:
+The `LocalCluster` is an `AutoCloseable` and will shut down when close is called. 
+
+many of the Nimbus APIs are also available through the LocalCluster.
+
+### DRPC
+
+DRPC can be run in local mode as well. Here's how to run the above example in local mode:
 
 ```java
-cluster.shutdown();
+try (LocalDRPC drpc = new LocalDRPC();
+     LocalCluster cluster = new LocalCluster();
+     LocalTopology topo = cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc))) {
+
+    System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
+}
 ```
 
+First you create a `LocalDRPC` object. This object simulates a DRPC server in process, just like how `LocalCluster` simulates a Storm cluster in process. Then you create the `LocalCluster` to run the topology in local mode. `LinearDRPCTopologyBuilder` has separate methods for creating local topologies and remote topologies. In local mode the `LocalDRPC` object does not bind to any ports so the topology needs to know about the object to communicate with it. This is why `createLocalTopology` takes in the `LocalDRPC` object as input.
+
+After launching the topology, you can do DRPC invocations using the `execute` method on `LocalDRPC`.
+
+Because all of the objects used are instances of AutoCloseable when the try blocks scope ends the topology is killed, the cluster is shut down and the drpc server also shuts down.
+
+### Clojure API
+
+Storm also offers a clojure API for testing.
+
+[This blog post](http://www.pixelmachine.org/2011/12/21/Testing-Storm-Topologies-Part-2.html) talk about this, but is a little out of date.  To get this functionality you need to include the `storm-clojure-test` dependency.  This will pull in a lot of storm itself that should not be packaged with your topology, sp please make sure it is a test dependency only,.
+
+### Debugging your topology with an IDE
+
+One of the great use cases for local mode is to be able to walk through the code execution of your bolts and spouts using an IDE.  You can do this on the command line by adding the `--java-debug` option followed by the paramer you would pass to jdwp. This makes it simple to launch the local cluster with `-agentlib:jdwp=` turned on.
+
+When running from within an IDE itself you can modify your code run run withing a call to `LocalCluster.withLocalModeOverride`
+
+```java
+public static void main(final String args[]) {
+    LocalCluster.withLocalModeOverride(() -> originalMain(args), 10);
+}
+```
+
+Or you could also modify the IDE to run "org.apache.storm.LocalCluster" instead of your main class when launching, and pass in the name of the class as an argument to it.  This will also trigger local mode, and is what `storm local` does behind the scenes. 
+
 ### Common configurations for local mode
 
 You can see a full list of configurations [here](javadocs/org/apache/storm/Config.html).
 
 1. **Config.TOPOLOGY_MAX_TASK_PARALLELISM**: This config puts a ceiling on the number of threads spawned for a single component. Oftentimes production topologies have a lot of parallelism (hundreds of threads) which places unreasonable load when trying to test the topology in local mode. This config lets you easy control that parallelism.
-2. **Config.TOPOLOGY_DEBUG**: When this is set to true, Storm will log a message every time a tuple is emitted from any spout or bolt. This is extremely useful for debugging.
+2. **Config.TOPOLOGY_DEBUG**: When this is set to true, Storm will log a message every time a tuple is emitted from any spout or bolt. This is extremely useful for debugging.A
+
+These, like all other configs, can be set on the command line when launching your toplogy with the `-c` flag.  The flag is of the form `-c <conf_name>=<JSON_VALUE>`  so to enable debugging when launching your topology in local mode you could run
+
+```
+storm local topology.jar <MY_MAIN_CLASS> -c topology.debug=true
+``` 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/docs/Tutorial.md
----------------------------------------------------------------------
diff --git a/docs/Tutorial.md b/docs/Tutorial.md
index 5dad834..f71c209 100644
--- a/docs/Tutorial.md
+++ b/docs/Tutorial.md
@@ -206,36 +206,9 @@ public static class ExclamationBolt extends BaseRichBolt {
 
 Let's see how to run the `ExclamationTopology` in local mode and see that it's working.
 
-Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies. When you run the topologies in storm-starter, they'll run in local mode and you'll be able to see what messages each component is emitting. You can read more about running topologies in local mode on [Local mode](Local-mode.html).
+Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies. You can read more about running topologies in local mode on [Local mode](Local-mode.html).
 
-In distributed mode, Storm operates as a cluster of machines. When you submit a topology to the master, you also submit all the code necessary to run the topology. The master will take care of distributing your code and allocating workers to run your topology. If workers go down, the master will reassign them somewhere else. You can read more about running topologies on a cluster on [Running topologies on a production cluster](Running-topologies-on-a-production-cluster.html)]. 
-
-Here's the code that runs `ExclamationTopology` in local mode:
-
-```java
-Config conf = new Config();
-conf.setDebug(true);
-conf.setNumWorkers(2);
-
-LocalCluster cluster = new LocalCluster();
-cluster.submitTopology("test", conf, builder.createTopology());
-Utils.sleep(10000);
-cluster.killTopology("test");
-cluster.shutdown();
-```
-
-First, the code defines an in-process cluster by creating a `LocalCluster` object. Submitting topologies to this virtual cluster is identical to submitting topologies to distributed clusters. It submits a topology to the `LocalCluster` by calling `submitTopology`, which takes as arguments a name for the running topology, a configuration for the topology, and then the topology itself.
-
-The name is used to identify the topology so that you can kill it later on. A topology will run indefinitely until you kill it.
-
-The configuration is used to tune various aspects of the running topology. The two configurations specified here are very common:
-
-1. **TOPOLOGY_WORKERS** (set with `setNumWorkers`) specifies how many _processes_ you want allocated around the cluster to execute the topology. Each component in the topology will execute as many _threads_. The number of threads allocated to a given component is configured through the `setBolt` and `setSpout` methods. Those _threads_ exist within worker _processes_. Each worker _process_ contains within it some number of _threads_ for some number of components. For instance, you may have 300 threads specified across all your components and 50 worker processes specified in your config. Each worker process will execute 6 threads, each of which of could belong to a different component. You tune the performance of Storm topologies by tweaking the parallelism for each component and the number of worker processes those threads should run within.
-2. **TOPOLOGY_DEBUG** (set with `setDebug`), when set to true, tells Storm to log every message every emitted by a component. This is useful in local mode when testing topologies, but you probably want to keep this turned off when running topologies on the cluster.
-
-There's many other configurations you can set for the topology. The various configurations are detailed on [the Javadoc for Config](javadocs/org/apache/storm/Config.html).
-
-To learn about how to set up your development environment so that you can run topologies in local mode (such as in Eclipse), see [Creating a new Storm project](Creating-a-new-Storm-project.html).
+To run a topology in local mode run the command `storm local` instead of `storm jar`.
 
 ## Stream groupings
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/docs/flux.md
----------------------------------------------------------------------
diff --git a/docs/flux.md b/docs/flux.md
index 7ebc590..886f55d 100644
--- a/docs/flux.md
+++ b/docs/flux.md
@@ -20,38 +20,7 @@ order to change configuration.
 
 ## About
 Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
-deveoper-intensive.
-
-Have you ever found yourself repeating this pattern?:
-
-```java
-
-public static void main(String[] args) throws Exception {
-    // logic to determine if we're running locally or not...
-    // create necessary config options...
-    boolean runLocal = shouldRunLocal();
-    if(runLocal){
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(name, conf, topology);
-    } else {
-        StormSubmitter.submitTopology(name, conf, topology);
-    }
-}
-```
-
-Wouldn't something like this be easier:
-
-```bash
-storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
-```
-
-or:
-
-```bash
-storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
-```
-
-Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
+deveoper-intensive. One of the pain points often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
 and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
 pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
 the layout and configuration of your topologies.

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/docs/storm-hbase.md
----------------------------------------------------------------------
diff --git a/docs/storm-hbase.md b/docs/storm-hbase.md
index 7f4fb62..7f71346 100644
--- a/docs/storm-hbase.md
+++ b/docs/storm-hbase.md
@@ -223,18 +223,12 @@ public class PersistentWordCount {
         builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
 
-
-        if (args.length == 0) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.createTopology());
-            Thread.sleep(10000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else {
-            config.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
+        String topoName = "test";
+        if (args.length > 0) {
+            topoName = args[0];
         }
+        config.setNumWorkers(3);
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 }
 ```

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-elasticsearch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml
index b351a2b..64c8229 100644
--- a/examples/storm-elasticsearch-examples/pom.xml
+++ b/examples/storm-elasticsearch-examples/pom.xml
@@ -29,7 +29,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 0a518cd..3cd2bc8 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -17,9 +17,16 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -27,14 +34,6 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.apache.storm.elasticsearch.common.EsConstants;
-import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.apache.storm.elasticsearch.common.EsTupleMapper;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class EsIndexTopology {
 
@@ -54,12 +53,7 @@ public class EsIndexTopology {
 
         EsTestUtil.startEsNode();
         EsTestUtil.waitForSeconds(5);
-
-        try (LocalCluster cluster = new LocalCluster();
-            LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());) {
-            EsTestUtil.waitForSeconds(20);
-        }
-        System.exit(0);
+        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
     }
 
     public static class UserDataSpout extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 4293c8c..307a991 100644
--- a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -17,24 +17,27 @@
  */
 package org.apache.storm.elasticsearch.trident;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsConstants;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IBatchSpout;
 import org.apache.storm.trident.state.StateFactory;
-
-import java.util.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class TridentEsTopology {
 
@@ -56,11 +59,7 @@ public class TridentEsTopology {
         EsTestUtil.startEsNode();
         EsTestUtil.waitForSeconds(5);
 
-        try (LocalCluster cluster = new LocalCluster();
-             LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, null, topology.build());) {
-            EsTestUtil.waitForSeconds(20);
-        }
-        System.exit(0);
+        StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), topology.build());
     }
 
     public static class FixedBatchSpout implements IBatchSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hbase-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/pom.xml b/examples/storm-hbase-examples/pom.xml
index 3014486..e9d3282 100644
--- a/examples/storm-hbase-examples/pom.xml
+++ b/examples/storm-hbase-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
index afbfafd..a10f34f 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/LookupWordCount.java
@@ -17,18 +17,16 @@
  */
 package org.apache.storm.hbase.topology;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.hbase.bolt.HBaseLookupBolt;
 import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 
 
 public class LookupWordCount {
@@ -63,17 +61,14 @@ public class LookupWordCount {
         builder.setSpout(WORD_SPOUT, spout, 1);
         builder.setBolt(LOOKUP_BOLT, hBaseLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("columnName"));
-
+        String topoName = "test";
         if (args.length == 1) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 2) {
-            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
-        } else{
+            topoName = args[1];
+        } else if (args.length > 1) {
             System.out.println("Usage: LookupWordCount <hbase.rootdir>");
+            return;
         }
+            
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
index ed866e9..700a7cc 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/PersistentWordCount.java
@@ -18,19 +18,16 @@
 package org.apache.storm.hbase.topology;
 
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.hbase.bolt.HBaseBolt;
 import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
 import org.apache.storm.hbase.security.HBaseSecurityUtil;
-
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 
 
 public class PersistentWordCount {
@@ -68,24 +65,20 @@ public class PersistentWordCount {
         builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
         builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
 
-
-        if (args.length == 1) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else if (args.length == 2) {
-            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        String topoName = "test";
+        if (args.length == 2) {
+            topoName = args[0];
         } else if (args.length == 4) {
             System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] + 
-                ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+                    ", principal name: " + args[3] + ", toplogy name: " + args[1]);
             hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
             hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
             config.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
-        } else {
+            topoName = args[1];
+        } else if (args.length != 1) {
             System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
index 62a2005..c81512e 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/trident/WordCountTrident.java
@@ -17,14 +17,10 @@
  */
 package org.apache.storm.hbase.trident;
 
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
 import org.apache.storm.hbase.topology.WordCountValueMapper;
@@ -39,6 +35,8 @@ import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class WordCountTrident {
     public static StormTopology buildTopology(String hbaseRoot){
@@ -86,19 +84,16 @@ public class WordCountTrident {
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if (args.length == 1) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));) {
-                Thread.sleep(60 * 1000);
-            }
-            System.exit(0);
-        }
-        else if(args.length == 2) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[1], conf, buildTopology(args[0]));
-        } else{
+        String topoName = "wordCounter";
+        
+        if (args.length == 2) {
+            topoName = args[1];
+        } else if (args.length > 2) {
             System.out.println("Usage: TridentFileTopology <hdfs url> [topology name]");
+            return;
         }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0]));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hdfs-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/pom.xml b/examples/storm-hdfs-examples/pom.xml
index 3d63529..34b4ef5 100644
--- a/examples/storm-hdfs-examples/pom.xml
+++ b/examples/storm-hdfs-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
index a5c2fe3..ea3ab94 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
@@ -17,10 +17,24 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -31,26 +45,8 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
-import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.hdfs.common.rotation.MoveFileAction;
 import org.yaml.snakeyaml.Yaml;
 
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class HdfsFileTopology {
     static final String SENTENCE_SPOUT_ID = "sentence-spout";
     static final String BOLT_ID = "my-bolt";
@@ -97,18 +93,14 @@ public class HdfsFileTopology {
         // SentenceSpout --> MyBolt
         builder.setBolt(BOLT_ID, bolt, 4)
                 .shuffleGrouping(SENTENCE_SPOUT_ID);
-
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());) {
-                waitForSeconds(120);
-            }
-            System.exit(0);
-        } else if (args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
+        String topoName = TOPOLOGY_NAME;
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: HdfsFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     public static void waitForSeconds(int seconds) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
index abe1ebd..0ef3868 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
@@ -17,10 +17,25 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.MoveFileAction;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -31,24 +46,8 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.hdfs.bolt.format.*;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.hdfs.common.rotation.MoveFileAction;
-
-import org.apache.hadoop.io.SequenceFile;
 import org.yaml.snakeyaml.Yaml;
 
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class SequenceFileTopology {
     static final String SENTENCE_SPOUT_ID = "sentence-spout";
     static final String BOLT_ID = "my-bolt";
@@ -97,18 +96,14 @@ public class SequenceFileTopology {
         builder.setBolt(BOLT_ID, bolt, 4)
                 .shuffleGrouping(SENTENCE_SPOUT_ID);
 
-
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());) {
-                waitForSeconds(120);
-            }
-            System.exit(0);
-        } else if(args.length == 3) {
-            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
-        } else{
+        String topoName = TOPOLOGY_NAME;
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: SequenceFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+            return;
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     public static void waitForSeconds(int seconds) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
index 4eab557..580cb91 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
@@ -134,7 +134,7 @@ public class HdfsSpoutTopology {
     // 4 - submit topology, wait for a few min and terminate it
     Map clusterConf = Utils.readStormConfig();
     StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
-    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+    Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
 
     // 5 - Print metrics every 30 sec, kill topology after 20 min
     for (int i = 0; i < 40; i++) {
@@ -144,13 +144,13 @@ public class HdfsSpoutTopology {
     kill(client, topologyName);
   } // main
 
-  private static void kill(Nimbus.Client client, String topologyName) throws Exception {
+  private static void kill(Nimbus.Iface client, String topologyName) throws Exception {
     KillOptions opts = new KillOptions();
     opts.set_wait_secs(0);
     client.killTopologyWithOpts(topologyName, opts);
   }
 
-  static void printMetrics(Nimbus.Client client, String name) throws Exception {
+  static void printMetrics(Nimbus.Iface client, String name) throws Exception {
     ClusterSummary summary = client.getClusterInfo();
     String id = null;
     for (TopologySummary ts: summary.get_topologies()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
index 1e830a1..af76c00 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentFileTopology.java
@@ -17,27 +17,27 @@
  */
 package org.apache.storm.hdfs.trident;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.hdfs.common.rotation.MoveFileAction;
-import org.apache.storm.hdfs.trident.format.*;
+import org.apache.storm.hdfs.trident.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.yaml.snakeyaml.Yaml;
 
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
 public class TridentFileTopology {
 
     public static StormTopology buildTopology(String hdfsUrl){
@@ -85,17 +85,14 @@ public class TridentFileTopology {
         Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
         in.close();
         conf.put("hdfs.config", yamlConf);
-
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));) {
-                Thread.sleep(120 * 1000);
-            }
-        } else if(args.length == 3) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
-        } else{
+        String topoName = "wordCounter";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
             System.out.println("Usage: TridentFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
+            return;
         }
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0]));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
index 3248363..525770b 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/trident/TridentSequenceTopology.java
@@ -17,27 +17,27 @@
  */
 package org.apache.storm.hdfs.trident;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.hdfs.common.rotation.MoveFileAction;
-import org.apache.storm.hdfs.trident.format.*;
+import org.apache.storm.hdfs.trident.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.trident.format.DefaultSequenceFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.yaml.snakeyaml.Yaml;
 
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
 public class TridentSequenceTopology {
 
     public static StormTopology buildTopology(String hdfsUrl){
@@ -82,17 +82,15 @@ public class TridentSequenceTopology {
         Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
         in.close();
         conf.put("hdfs.config", yamlConf);
-
-        if (args.length == 2) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0]));) {
-                Thread.sleep(120 * 1000);
-            }
-        } else if(args.length == 3) {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0]));
-        } else{
-            System.out.println("Usage: TridentSequenceTopology [hdfs url] [hdfs yaml config file] <topology name>");
+        String topoName = "wordCounter";
+        if (args.length == 3) {
+            topoName = args[2];
+        } else if (args.length > 3) {
+            System.out.println("Usage: TridentSequenceTopology <hdfs_config_yaml> [<topology name>]");
+            return;
         }
+
+        conf.setNumWorkers(3);
+        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0]));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hive-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/pom.xml b/examples/storm-hive-examples/pom.xml
index 0c074e8..ec7a4c1 100644
--- a/examples/storm-hive-examples/pom.xml
+++ b/examples/storm-hive-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
index e80a118..50ab532 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
@@ -18,10 +18,19 @@
 
 package org.apache.storm.hive.bolt;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -30,18 +39,6 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
 
 public class BucketTestHiveTopology {
     static final String USER_SPOUT_ID = "user-spout";
@@ -94,22 +91,11 @@ public class BucketTestHiveTopology {
         // SentenceSpout --> MyBolt
         builder.setBolt(BOLT_ID, hiveBolt, 14)
                 .shuffleGrouping(USER_SPOUT_ID);
-        if (args.length == 6) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());) {
-                waitForSeconds(20);
-            }
-            System.exit(0);
-        } else {
-            StormSubmitter.submitTopology(args[7], config, builder.createTopology());
-        }
-    }
-
-    public static void waitForSeconds(int seconds) {
-        try {
-            Thread.sleep(seconds * 1000);
-        } catch (InterruptedException e) {
+        String topoName = TOPOLOGY_NAME;
+        if (args.length > 6) {
+            topoName = args[7];
         }
+        StormSubmitter.submitTopology(args[7], config, builder.createTopology());
     }
 
     public static class UserDataSpout extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
index e69c68f..39c9d5f 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -18,10 +18,14 @@
 
 package org.apache.storm.hive.bolt;
 
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -30,13 +34,6 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
 
 public class HiveTopology {
     static final String USER_SPOUT_ID = "user-spout";
@@ -75,17 +72,12 @@ public class HiveTopology {
         // SentenceSpout --> MyBolt
         builder.setBolt(BOLT_ID, hiveBolt, 1)
                 .shuffleGrouping(USER_SPOUT_ID);
-        if (args.length == 3) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());) {
-                waitForSeconds(20);
-            }
-            System.exit(0);
-        } else if(args.length >= 4) {
-            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
-        } else {
-            System.out.println("Usage: HiveTopology metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        
+        String topoName = TOPOLOGY_NAME;
+        if(args.length >= 4) {
+            topoName = args[3];
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     public static void waitForSeconds(int seconds) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
index f64e626..dc51708 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -18,10 +18,14 @@
 
 package org.apache.storm.hive.bolt;
 
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -31,13 +35,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
 
 public class HiveTopologyPartitioned {
     static final String USER_SPOUT_ID = "hive-user-spout-partitioned";
@@ -77,17 +74,11 @@ public class HiveTopologyPartitioned {
         // SentenceSpout --> MyBolt
         builder.setBolt(BOLT_ID, hiveBolt, 1)
                 .shuffleGrouping(USER_SPOUT_ID);
-        if (args.length == 3) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());) {
-                waitForSeconds(20);
-            }
-            System.exit(0);
-        } else if(args.length >= 4) {
-            StormSubmitter.submitTopology(args[3], config, builder.createTopology());
-        } else {
-            System.out.println("Usage: HiveTopologyPartitioned metastoreURI dbName tableName [topologyNamey] [keytab file] [principal name]");
+        String topoName = TOPOLOGY_NAME;
+        if (args.length > 3) {
+            topoName = args[3];
         }
+        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
     }
 
     public static void waitForSeconds(int seconds) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
index 4505561..5204f5b 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java
@@ -19,30 +19,26 @@
 package org.apache.storm.hive.trident;
 
 
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.hooks.SubmitterHookException;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
 import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,31 +92,27 @@ public class TridentHiveTopology {
         String tblName = args[2];
         Config conf = new Config();
         conf.setMaxSpoutPending(5);
-        if(args.length == 3) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null));) {
-                LOG.info("waiting for 60 seconds");
-                waitForSeconds(60);
-            }
-            System.exit(0);
-        } else if(args.length == 4) {
-            try {
-                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
-            } catch(SubmitterHookException e) {
-                LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
-            } catch (Exception e) {
-                LOG.warn("Failed to submit topology ", e);
-            }
-        } else if (args.length == 6) {
-            try {
-                StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5]));
-            } catch(SubmitterHookException e) {
-                LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
-            } catch (Exception e) {
-                LOG.warn("Failed to submit topology ", e);
-            }
-        } else {
-            LOG.info("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]");
+        String topoName = "tridentHiveTopology";
+        String keytab = null;
+        String principal = null;
+        
+        if (args.length > 3) {
+            topoName = args[3];
+        }
+        if (args.length == 6) {
+            keytab = args[4];
+            principal = args[5];
+        } else if (args.length != 3 && args.length != 4) {
+            LOG.info("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyName] [keytab principal]");
+            return;
+        }
+        
+        try {
+            StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null));
+        } catch(SubmitterHookException e) {
+            LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e);
+        } catch (Exception e) {
+            LOG.warn("Failed to submit topology ", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-jdbc-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/pom.xml b/examples/storm-jdbc-examples/pom.xml
index ab35c6e..eb3279c 100644
--- a/examples/storm-jdbc-examples/pom.xml
+++ b/examples/storm-jdbc-examples/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 19cdf74..79ca987 100644
--- a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -17,27 +17,26 @@
  */
 package org.apache.storm.jdbc.topology;
 
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
 import org.apache.storm.jdbc.spout.UserSpout;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.tuple.Fields;
 
-import java.sql.Types;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public abstract class AbstractUserTopology {
     private static final List<String> setupSqls = Lists.newArrayList(
@@ -99,15 +98,11 @@ public abstract class AbstractUserTopology {
         List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
         this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
         this.connectionProvider = new HikariCPConnectionProvider(map);
-        if (args.length == 4) {
-            try (LocalCluster cluster = new LocalCluster();
-                 LocalTopology topo = cluster.submitTopology("test", config, getTopology());) {
-                Thread.sleep(30000);
-            }
-            System.exit(0);
-        } else {
-            StormSubmitter.submitTopology(args[4], config, getTopology());
+        String topoName = "test";
+        if (args.length > 4) {
+            topoName = args[4];
         }
+        StormSubmitter.submitTopology(topoName, config, getTopology());
     }
 
     public abstract StormTopology getTopology();