You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/23 08:03:03 UTC

[7/9] storm git commit: STORM-2016 Topology submission improvement: support adding local jars and maven artifacts on submission

STORM-2016 Topology submission improvement: support adding local jars and maven artifacts on submission

*NOTE* This contains a bugfix at forward reference in storm.thrift (from STORM-1994)

* bin/storm now supports "--jars" and "--artifacts" options
** it's only effective with "storm jar" and "storm sql"
* introduce new module: storm-submit-tools to help resolving dependencies with handling transitive dependencies
* StormSubmitter will upload dependencies to BlobStore when submitting topology
** StormSubmitter will remove jars blobs when topology submission fails
*** remove jar blobs when only one of AlreadyAliveException, InvalidTopologyException, AuthorizationException occurs
* Supervisor will download dependencies from BlobStore when such topology is assigned
* Supervisor will launch workers with adding downloaded dependencies to worker classpath
* Nimbus will remove jars from BlobStore when topology is killed
** don't remove artifacts from blobstore since it's shared across topologies
* add options to document (Command-line-client.md) and also storm.py
** it will be printed out when 'help jar' and 'help sql' is called


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b9a4d03
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b9a4d03
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b9a4d03

Branch: refs/heads/1.x-branch
Commit: 0b9a4d031024907f628e82ae365e795a3714e539
Parents: 7ef2fdd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 4 09:52:45 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Aug 23 17:01:57 2016 +0900

----------------------------------------------------------------------
 bin/storm.py                                    |  114 +-
 docs/Command-line-client.md                     |   28 +-
 external/storm-submit-tools/pom.xml             |  201 +++
 .../submit/command/DependencyResolverMain.java  |  105 ++
 .../storm/submit/dependency/AetherUtils.java    |   81 ++
 .../apache/storm/submit/dependency/Booter.java  |   63 +
 .../submit/dependency/DependencyResolver.java   |   70 +
 .../dependency/RepositorySystemFactory.java     |   66 +
 .../submit/dependency/AetherUtilsTest.java      |  102 ++
 .../dependency/DependencyResolverTest.java      |   78 ++
 pom.xml                                         |    5 +
 storm-core/src/clj/org/apache/storm/config.clj  |    7 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |    6 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   14 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |   65 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |  102 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |   19 +
 .../dependency/DependencyPropertiesParser.java  |   61 +
 .../storm/dependency/DependencyUploader.java    |  166 +++
 .../dependency/FileNotAvailableException.java   |   33 +
 .../org/apache/storm/generated/Assignment.java  |  244 ++--
 .../org/apache/storm/generated/BoltStats.java   |  406 +++---
 .../apache/storm/generated/ClusterSummary.java  |  108 +-
 .../storm/generated/ClusterWorkerHeartbeat.java |   52 +-
 .../storm/generated/ComponentPageInfo.java      |  220 +--
 .../org/apache/storm/generated/Credentials.java |   44 +-
 .../apache/storm/generated/ExecutorStats.java   |  168 +--
 .../jvm/org/apache/storm/generated/HBNodes.java |   32 +-
 .../org/apache/storm/generated/HBRecords.java   |   36 +-
 .../storm/generated/LSApprovedWorkers.java      |   44 +-
 .../generated/LSSupervisorAssignments.java      |   48 +-
 .../apache/storm/generated/LSTopoHistory.java   |   64 +-
 .../storm/generated/LSTopoHistoryList.java      |   36 +-
 .../storm/generated/LSWorkerHeartbeat.java      |   36 +-
 .../apache/storm/generated/ListBlobsResult.java |   32 +-
 .../apache/storm/generated/LocalAssignment.java |   36 +-
 .../apache/storm/generated/LocalStateData.java  |   48 +-
 .../org/apache/storm/generated/LogConfig.java   |   48 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  |   36 +-
 .../org/apache/storm/generated/NodeInfo.java    |   32 +-
 .../storm/generated/RebalanceOptions.java       |   44 +-
 .../storm/generated/SettableBlobMeta.java       |   36 +-
 .../org/apache/storm/generated/SpoutStats.java  |  252 ++--
 .../org/apache/storm/generated/StormBase.java   |   92 +-
 .../apache/storm/generated/StormTopology.java   |  428 +++++-
 .../apache/storm/generated/SupervisorInfo.java  |  152 +-
 .../storm/generated/SupervisorPageInfo.java     |   76 +-
 .../storm/generated/SupervisorSummary.java      |   44 +-
 .../storm/generated/TopologyHistoryInfo.java    |   32 +-
 .../apache/storm/generated/TopologyInfo.java    |  160 +--
 .../storm/generated/TopologyPageInfo.java       |  132 +-
 .../apache/storm/generated/TopologyStats.java   |  220 +--
 .../apache/storm/utils/ThriftTopologyUtils.java |    4 +
 storm-core/src/py/storm/Nimbus.py               |   14 +-
 storm-core/src/py/storm/ttypes.py               | 1302 +++++++++---------
 storm-core/src/storm.thrift                     |   12 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |    7 +-
 .../clj/org/apache/storm/supervisor_test.clj    |   16 +-
 .../DependencyPropertiesParserTest.java         |   72 +
 .../dependency/DependencyUploaderTest.java      |  297 ++++
 .../storm/utils/ThriftTopologyUtilsTest.java    |   12 +
 storm-dist/binary/src/main/assembly/binary.xml  |    7 +
 62 files changed, 4308 insertions(+), 2259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 354daca..61aae98 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -24,6 +24,7 @@ import shlex
 import tempfile
 import uuid
 import subprocess as sub
+import json
 
 import sys
 
@@ -96,6 +97,8 @@ if JAVA_HOME and not os.path.exists(JAVA_CMD):
     sys.exit(1)
 STORM_EXT_CLASSPATH = os.getenv('STORM_EXT_CLASSPATH', None)
 STORM_EXT_CLASSPATH_DAEMON = os.getenv('STORM_EXT_CLASSPATH_DAEMON', None)
+DEP_JARS_OPTS = []
+DEP_ARTIFACTS_OPTS = []
 
 def get_config_opts():
     global CONFIG_OPTS
@@ -154,6 +157,42 @@ def confvalue(name, extrapaths, daemon=True):
             return " ".join(tokens[1:])
     return ""
 
+def resolve_dependencies(artifacts):
+    if len(artifacts) == 0:
+        return {}
+
+    print("Resolving dependencies on demand: artifacts (%s)" % artifacts)
+    sys.stdout.flush()
+
+    # TODO: should we move some external modules to outer place?
+
+    # storm-submit module doesn't rely on storm-core and relevant libs
+    extrajars = get_jars_full(STORM_DIR + "/external/storm-submit-tools")
+    classpath = normclasspath(os.pathsep.join(extrajars))
+
+    command = [
+        JAVA_CMD, "-client", "-cp", classpath, "org.apache.storm.submit.command.DependencyResolverMain",
+        ",".join(artifacts)
+    ]
+
+    p = sub.Popen(command, stdout=sub.PIPE)
+    output, errors = p.communicate()
+    if p.returncode != 0:
+        raise RuntimeError("dependency handler returns non-zero code: code<%s> syserr<%s>" % (p.returncode, errors))
+
+    # python 3
+    if not isinstance(output, str):
+        output = output.decode('utf-8')
+
+    # For debug purpose, uncomment when you need to debug DependencyResolver
+    #print("Resolved dependencies: %s" % output)
+
+    try:
+        out_dict = json.loads(output)
+        return out_dict
+    except:
+        raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
+
 def print_localconfvalue(name):
     """Syntax: [storm localconfvalue conf-name]
 
@@ -232,44 +271,91 @@ def jar(jarfile, klass, *args):
     The process is configured so that StormSubmitter
     (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
     will upload the jar at topology-jar-path when the topology is submitted.
+
+    When you want to ship other jars which is not included to application jar, you can pass them to --jars option with comma-separated string.
+    For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load your-local-jar.jar and your-local-jar2.jar.
+    And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
+    You can also exclude some dependencies like what you're doing in maven pom.
+    Please add exclusion artifacts with '^' separated string after the artifact.
+    For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka.
+
+    Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12"`
+
+    When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
     """
+    global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS
+
+    local_jars = DEP_JARS_OPTS
+    artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS)
+
     transform_class = confvalue("client.jartransformer.class", [CLUSTER_CONF_DIR])
     if (transform_class != None and transform_class != "nil"):
         tmpjar = os.path.join(tempfile.gettempdir(), uuid.uuid1().hex+".jar")
         exec_storm_class("org.apache.storm.daemon.ClientJarTransformerRunner", args=[transform_class, jarfile, tmpjar], fork=True, daemon=False)
+        extra_jars = [tmpjar, USER_CONF_DIR, STORM_BIN_DIR]
+        extra_jars.extend(local_jars)
+        extra_jars.extend(artifact_to_file_jars.values())
         topology_runner_exit_code = exec_storm_class(
                 klass,
                 jvmtype="-client",
-                extrajars=[tmpjar, USER_CONF_DIR, STORM_BIN_DIR],
+                extrajars=extra_jars,
                 args=args,
                 daemon=False,
                 fork=True,
-                jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + tmpjar])
+                jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + tmpjar] +
+                        ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
+                        ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
         os.remove(tmpjar)
         sys.exit(topology_runner_exit_code)
     else:
+        extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR]
+        extra_jars.extend(local_jars)
+        extra_jars.extend(artifact_to_file_jars.values())
         exec_storm_class(
             klass,
             jvmtype="-client",
-            extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],
+            extrajars=extra_jars,
             args=args,
             daemon=False,
-            jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])
+            jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile] +
+                    ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
+                    ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
 
 def sql(sql_file, topology_name):
     """Syntax: [storm sql sql-file topology-name]
 
     Compiles the SQL statements into a Trident topology and submits it to Storm.
+
+    --jars and --artifacts options available for jar are also applied to sql command.
+    Please refer "help jar" to see how to use --jars and --artifacts options.
+    You normally want to pass these options since you need to set data source to your sql which is an external storage in many cases.
     """
+    global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS
+
+    local_jars = DEP_JARS_OPTS
+    artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS)
+
+    sql_core_jars = get_jars_full(STORM_DIR + "/external/sql/storm-sql-core")
+    sql_runtime_jars = get_jars_full(STORM_DIR + "/external/sql/storm-sql-runtime")
+
+    # include storm-sql-runtime jar(s) to local jar list
+    local_jars.extend(sql_runtime_jars)
+
     extrajars=[USER_CONF_DIR, STORM_BIN_DIR]
-    extrajars.extend(get_jars_full(STORM_DIR + "/external/sql/storm-sql-core"))
-    extrajars.extend(get_jars_full(STORM_DIR + "/external/sql/storm-sql-runtime"))
+    extrajars.extend(local_jars)
+    extrajars.extend(artifact_to_file_jars.values())
+
+    # include this for running StormSqlRunner, but not for generated topology
+    extrajars.extend(sql_core_jars)
+
     exec_storm_class(
         "org.apache.storm.sql.StormSqlRunner",
         jvmtype="-client",
         extrajars=extrajars,
         args=[sql_file, topology_name],
-        daemon=False)
+        daemon=False,
+        jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
+                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
 
 def kill(*args):
     """Syntax: [storm kill topology-name [-w wait-time-secs]]
@@ -746,6 +832,8 @@ def parse_config_opts(args):
   curr.reverse()
   config_list = []
   args_list = []
+  jars_list = []
+  artifacts_list = []
 
   while len(curr) > 0:
     token = curr.pop()
@@ -754,18 +842,24 @@ def parse_config_opts(args):
     elif token == "--config":
       global CONFFILE
       CONFFILE = curr.pop()
+    elif token == "--jars":
+      jars_list.extend(curr.pop().split(','))
+    elif token == "--artifacts":
+      artifacts_list.extend(curr.pop().split(','))
     else:
       args_list.append(token)
 
-  return config_list, args_list
+  return config_list, jars_list, artifacts_list, args_list
 
 def main():
     if len(sys.argv) <= 1:
         print_usage()
         sys.exit(-1)
-    global CONFIG_OPTS
-    config_list, args = parse_config_opts(sys.argv[1:])
+    global CONFIG_OPTS, DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS
+    config_list, jars_list, artifacts_list, args = parse_config_opts(sys.argv[1:])
     parse_config(config_list)
+    DEP_JARS_OPTS = jars_list
+    DEP_ARTIFACTS_OPTS = artifacts_list
     COMMAND = args[0]
     ARGS = args[1:]
     (COMMANDS.get(COMMAND, unknown_command))(*ARGS)

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/docs/Command-line-client.md
----------------------------------------------------------------------
diff --git a/docs/Command-line-client.md b/docs/Command-line-client.md
index ca45163..c48300c 100644
--- a/docs/Command-line-client.md
+++ b/docs/Command-line-client.md
@@ -8,6 +8,7 @@ This page describes all the commands that are possible with the "storm" command
 These commands are:
 
 1. jar
+1. sql
 1. kill
 1. activate
 1. deactivate
@@ -32,7 +33,6 @@ These commands are:
 1. pacemaker
 1. set_log_level
 1. shell
-1. sql
 1. upload-credentials
 1. version
 1. help
@@ -43,6 +43,22 @@ Syntax: `storm jar topology-jar-path class ...`
 
 Runs the main method of `class` with the specified arguments. The storm jars and configs in `~/.storm` are put on the classpath. The process is configured so that [StormSubmitter](javadocs/org/apache/storm/StormSubmitter.html) will upload the jar at `topology-jar-path` when the topology is submitted.
 
+When you want to ship other jars which is not included to application jar, you can pass them to `--jars` option with comma-separated string.
+For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load your-local-jar.jar and your-local-jar2.jar.
+And when you want to ship maven artifacts and its transitive dependencies, you can pass them to `--artifacts` with comma-separated string. You can also exclude some dependencies like what you're doing in maven pom. Please add exclusion artifacts with '^' separated string after the artifact. For example, `--artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12"` will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka.
+
+Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12"`
+
+When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
+
+### sql
+
+Syntax: `storm sql sql-file topology-name`
+
+Compiles the SQL statements into a Trident topology and submits it to Storm.
+
+`--jars` and `--artifacts` options are also applied to `sql` command. You normally want to pass these options since you need to set data source to your sql which is an external storage in many cases.
+
 ### kill
 
 Syntax: `storm kill topology-name [-w wait-time-secs]`
@@ -65,7 +81,7 @@ Deactivates the specified topology's spouts.
 
 Syntax: `storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*`
 
-Sometimes you may wish to spread out where the workers for a topology are running. For example, let's say you have a 10 node cluster running 4 workers per node, and then let's say you add another 10 nodes to the cluster. You may wish to have Storm spread out the workers for the running topology so that each node runs 2 workers. One way to do this is to kill the topology and resubmit it, but Storm provides a "rebalance" command that provides an easier way to do this. 
+Sometimes you may wish to spread out where the workers for a topology are running. For example, let's say you have a 10 node cluster running 4 workers per node, and then let's say you add another 10 nodes to the cluster. You may wish to have Storm spread out the workers for the running topology so that each node runs 2 workers. One way to do this is to kill the topology and resubmit it, but Storm provides a "rebalance" command that provides an easier way to do this.
 
 Rebalance will first deactivate the topology for the duration of the message timeout (overridable with the -w flag) and then redistribute the workers evenly around the cluster. The topology will then return to its previous state of activation (so a deactivated topology will still be deactivated and an activated topology will go back to being activated).
 
@@ -216,7 +232,7 @@ See Setting up a Storm cluster for more information.(http://storm.apache.org/doc
 Syntax: `storm set_log_level -l [logger name]=[log level][:optional timeout] -r [logger name] topology-name`
 
 Dynamically change topology log levels
-    
+
 where log level is one of: ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL, OFF
 and timeout is integer seconds.
 
@@ -245,12 +261,6 @@ Makes constructing jar and uploading to nimbus for using non JVM languages
 
 eg: `storm shell resources/ python topology.py arg1 arg2`
 
-### sql
-
-Syntax: `storm sql sql-file topology-name`
-
-Compiles the SQL statements into a Trident topology and submits it to Storm.
-
 ### upload-credentials
 
 Syntax: `storm upload_credentials topology-name [credkey credvalue]*`

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/pom.xml b/external/storm-submit-tools/pom.xml
new file mode 100644
index 0000000..d2237b0
--- /dev/null
+++ b/external/storm-submit-tools/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.1-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>storm-submit-tools</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <!-- Aether :: maven dependency resolution -->
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-plugin-api</artifactId>
+            <version>3.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.plexus</groupId>
+                    <artifactId>plexus-utils</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.sonatype.sisu</groupId>
+                    <artifactId>sisu-inject-plexus</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.maven</groupId>
+                    <artifactId>maven-model</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.sonatype.aether</groupId>
+            <artifactId>aether-api</artifactId>
+            <version>1.12</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.sonatype.aether</groupId>
+            <artifactId>aether-util</artifactId>
+            <version>1.12</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.sonatype.aether</groupId>
+            <artifactId>aether-impl</artifactId>
+            <version>1.12</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-aether-provider</artifactId>
+            <version>3.0.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.sonatype.aether</groupId>
+                    <artifactId>aether-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.sonatype.aether</groupId>
+                    <artifactId>aether-spi</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.sonatype.aether</groupId>
+                    <artifactId>aether-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.sonatype.aether</groupId>
+                    <artifactId>aether-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.plexus</groupId>
+                    <artifactId>plexus-utils</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.sonatype.aether</groupId>
+            <artifactId>aether-connector-file</artifactId>
+            <version>1.12</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.sonatype.aether</groupId>
+            <artifactId>aether-connector-wagon</artifactId>
+            <version>1.12</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.maven.wagon</groupId>
+                    <artifactId>wagon-provider-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.wagon</groupId>
+            <artifactId>wagon-provider-api</artifactId>
+            <version>1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.plexus</groupId>
+                    <artifactId>plexus-utils</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.wagon</groupId>
+            <artifactId>wagon-http-lightweight</artifactId>
+            <version>1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.maven.wagon</groupId>
+                    <artifactId>wagon-http-shared</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.wagon</groupId>
+            <artifactId>wagon-http</artifactId>
+            <version>1.0</version>
+            <exclusions>
+            </exclusions>
+        </dependency>
+
+        <!-- storm-core is needed only for test (surefire) -->
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/main/java/org/apache/storm/submit/command/DependencyResolverMain.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/command/DependencyResolverMain.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/command/DependencyResolverMain.java
new file mode 100644
index 0000000..3527b5b
--- /dev/null
+++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/command/DependencyResolverMain.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.submit.command;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.storm.submit.dependency.AetherUtils;
+import org.apache.storm.submit.dependency.DependencyResolver;
+import org.json.simple.JSONValue;
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.resolution.ArtifactResult;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DependencyResolverMain {
+
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            throw new IllegalArgumentException("artifacts must be presented.");
+        }
+
+        String artifactsArg = args[0];
+
+        // DO NOT CHANGE THIS TO SYSOUT
+        System.err.println("DependencyResolver input - artifacts: " + artifactsArg);
+
+        List<Dependency> dependencies = parseArtifactArgs(artifactsArg);
+        try {
+            DependencyResolver resolver = new DependencyResolver("local-repo");
+
+            List<ArtifactResult> artifactResults = resolver.resolve(dependencies);
+
+            Iterable<ArtifactResult> missingArtifacts = filterMissingArtifacts(artifactResults);
+            if (missingArtifacts.iterator().hasNext()) {
+                printMissingArtifactsToSysErr(missingArtifacts);
+                throw new RuntimeException("Some artifacts are not resolved");
+            }
+
+            System.out.println(JSONValue.toJSONString(transformArtifactResultToArtifactToPaths(artifactResults)));
+            System.out.flush();
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Iterable<ArtifactResult> filterMissingArtifacts(List<ArtifactResult> artifactResults) {
+        return Iterables.filter(artifactResults, new Predicate<ArtifactResult>() {
+            @Override
+            public boolean apply(ArtifactResult artifactResult) {
+                return artifactResult.isMissing();
+            }
+        });
+    }
+
+    private static void printMissingArtifactsToSysErr(Iterable<ArtifactResult> missingArtifacts) {
+        for (ArtifactResult artifactResult : missingArtifacts) {
+            System.err.println("ArtifactResult : " + artifactResult + " / Errors : " + artifactResult.getExceptions());
+        }
+    }
+
+    private static List<Dependency> parseArtifactArgs(String artifactArgs) {
+        List<String> artifacts = Arrays.asList(artifactArgs.split(","));
+        List<Dependency> dependencies = new ArrayList<>(artifacts.size());
+        for (String artifactOpt : artifacts) {
+            if (artifactOpt.trim().isEmpty()) {
+                continue;
+            }
+
+            dependencies.add(AetherUtils.parseDependency(artifactOpt));
+        }
+
+        return dependencies;
+    }
+
+    private static Map<String, String> transformArtifactResultToArtifactToPaths(List<ArtifactResult> artifactResults) {
+        Map<String, String> artifactToPath = new LinkedHashMap<>();
+        for (ArtifactResult artifactResult : artifactResults) {
+            Artifact artifact = artifactResult.getArtifact();
+            artifactToPath.put(AetherUtils.artifactToString(artifact), artifact.getFile().getAbsolutePath());
+        }
+        return artifactToPath;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java
new file mode 100644
index 0000000..07a98e6
--- /dev/null
+++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.submit.dependency;
+
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.graph.Exclusion;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+public class AetherUtils {
+    private AetherUtils() {
+    }
+
+    public static Dependency parseDependency(String dependency) {
+        List<String> dependencyAndExclusions = Arrays.asList(dependency.split("\\^"));
+        Collection<Exclusion> exclusions = new ArrayList<>();
+        for (int idx = 1 ; idx < dependencyAndExclusions.size() ; idx++) {
+            exclusions.add(AetherUtils.createExclusion(dependencyAndExclusions.get(idx)));
+        }
+
+        Artifact artifact = new DefaultArtifact(dependencyAndExclusions.get(0));
+        return new Dependency(artifact, JavaScopes.COMPILE, false, exclusions);
+    }
+
+    public static Exclusion createExclusion(String exclusionString) {
+        String[] parts = exclusionString.split(":");
+
+        // length of parts should be greater than 0
+        String groupId = parts[0];
+
+        String artifactId = "*";
+        String classifier = "*";
+        String extension = "*";
+
+        int len = parts.length;
+        if (len > 1) {
+            artifactId = parts[1];
+        }
+        if (len > 2) {
+            classifier = parts[2];
+        }
+        if (len > 3) {
+            extension = parts[3];
+        }
+
+        return new Exclusion(groupId, artifactId, classifier, extension);
+    }
+
+    public static String artifactToString(Artifact artifact) {
+        StringBuilder buffer = new StringBuilder(128);
+        buffer.append(artifact.getGroupId());
+        buffer.append(':').append(artifact.getArtifactId());
+        buffer.append(':').append(artifact.getExtension());
+        if (artifact.getClassifier().length() > 0) {
+            buffer.append(':').append(artifact.getClassifier());
+        }
+        buffer.append(':').append(artifact.getVersion());
+        return buffer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java
new file mode 100644
index 0000000..94ab121
--- /dev/null
+++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.submit.dependency;
+
+import org.apache.maven.repository.internal.MavenRepositorySystemSession;
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.RepositorySystemSession;
+import org.sonatype.aether.repository.LocalRepository;
+import org.sonatype.aether.repository.RemoteRepository;
+
+import java.io.File;
+
+/**
+ * Manage mvn repository.
+ */
+public class Booter {
+  public static RepositorySystem newRepositorySystem() {
+    return RepositorySystemFactory.newRepositorySystem();
+  }
+
+  public static RepositorySystemSession newRepositorySystemSession(
+      RepositorySystem system, String localRepoPath) {
+    MavenRepositorySystemSession session = new MavenRepositorySystemSession();
+
+    // find homedir
+    String home = System.getProperty("storm.home");
+    if (home == null) {
+      home = ".";
+    }
+
+    String path = home + "/" + localRepoPath;
+
+    LocalRepository localRepo =
+        new LocalRepository(new File(path).getAbsolutePath());
+    session.setLocalRepositoryManager(system.newLocalRepositoryManager(localRepo));
+
+    return session;
+  }
+
+  public static RemoteRepository newCentralRepository() {
+    return new RemoteRepository("central", "default", "http://repo1.maven.org/maven2/");
+  }
+  
+  public static RemoteRepository newLocalRepository() {
+    return new RemoteRepository("local",
+        "default", "file://" + System.getProperty("user.home") + "/.m2/repository");
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/DependencyResolver.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/DependencyResolver.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/DependencyResolver.java
new file mode 100644
index 0000000..a6c10ca
--- /dev/null
+++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/DependencyResolver.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.submit.dependency;
+
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.RepositorySystemSession;
+import org.sonatype.aether.collection.CollectRequest;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.graph.DependencyFilter;
+import org.sonatype.aether.repository.RemoteRepository;
+import org.sonatype.aether.resolution.ArtifactResolutionException;
+import org.sonatype.aether.resolution.ArtifactResult;
+import org.sonatype.aether.resolution.DependencyRequest;
+import org.sonatype.aether.resolution.DependencyResolutionException;
+import org.sonatype.aether.util.artifact.JavaScopes;
+import org.sonatype.aether.util.filter.DependencyFilterUtils;
+
+import java.net.MalformedURLException;
+import java.util.Collections;
+import java.util.List;
+
+public class DependencyResolver {
+  private RepositorySystem system = Booter.newRepositorySystem();
+  private RepositorySystemSession session;
+  private RemoteRepository mavenCentral = Booter.newCentralRepository();
+  private RemoteRepository mavenLocal = Booter.newLocalRepository();
+
+  public DependencyResolver(String localRepoPath) {
+    session = Booter.newRepositorySystemSession(system, localRepoPath);
+  }
+
+  public List<ArtifactResult> resolve(List<Dependency> dependencies) throws MalformedURLException,
+      DependencyResolutionException, ArtifactResolutionException {
+
+    DependencyFilter classpathFilter = DependencyFilterUtils
+            .classpathFilter(JavaScopes.COMPILE, JavaScopes.RUNTIME);
+
+    if (dependencies.size() == 0) {
+      return Collections.EMPTY_LIST;
+    }
+
+    CollectRequest collectRequest = new CollectRequest();
+    collectRequest.setRoot(dependencies.get(0));
+    for (int idx = 1; idx < dependencies.size(); idx++) {
+      collectRequest.addDependency(dependencies.get(idx));
+    }
+
+    collectRequest.addRepository(mavenCentral);
+    collectRequest.addRepository(mavenLocal);
+
+    DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, classpathFilter);
+    return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/RepositorySystemFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/RepositorySystemFactory.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/RepositorySystemFactory.java
new file mode 100644
index 0000000..69edfb5
--- /dev/null
+++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/RepositorySystemFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.submit.dependency;
+
+import org.apache.maven.repository.internal.DefaultServiceLocator;
+import org.apache.maven.wagon.Wagon;
+import org.apache.maven.wagon.providers.http.HttpWagon;
+import org.apache.maven.wagon.providers.http.LightweightHttpWagon;
+import org.sonatype.aether.RepositorySystem;
+import org.sonatype.aether.connector.file.FileRepositoryConnectorFactory;
+import org.sonatype.aether.connector.wagon.WagonProvider;
+import org.sonatype.aether.connector.wagon.WagonRepositoryConnectorFactory;
+import org.sonatype.aether.spi.connector.RepositoryConnectorFactory;
+
+/**
+ * Get maven repository instance.
+ */
+public class RepositorySystemFactory {
+  public static RepositorySystem newRepositorySystem() {
+    DefaultServiceLocator locator = new DefaultServiceLocator();
+    locator.addService(RepositoryConnectorFactory.class, FileRepositoryConnectorFactory.class);
+    locator.addService(RepositoryConnectorFactory.class, WagonRepositoryConnectorFactory.class);
+    locator.setServices(WagonProvider.class, new ManualWagonProvider());
+
+    return locator.getService(RepositorySystem.class);
+  }
+
+  /**
+   * ManualWagonProvider
+   */
+  public static class ManualWagonProvider implements WagonProvider {
+
+    @Override
+    public Wagon lookup(String roleHint) throws Exception {
+      if ("http".equals(roleHint)) {
+        return new LightweightHttpWagon();
+      }
+
+      if ("https".equals(roleHint)) {
+        return new HttpWagon();
+      }
+
+      return null;
+    }
+
+    @Override
+    public void release(Wagon arg0) {
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/AetherUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/AetherUtilsTest.java b/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/AetherUtilsTest.java
new file mode 100644
index 0000000..9951621
--- /dev/null
+++ b/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/AetherUtilsTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.submit.dependency;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.sonatype.aether.artifact.Artifact;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.graph.Exclusion;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class AetherUtilsTest {
+    @Test
+    public void parseDependency() throws Exception {
+        String testDependency = "testgroup:testartifact:1.0.0^testgroup:testexcartifact^testgroup:*";
+
+        Dependency dependency = AetherUtils.parseDependency(testDependency);
+
+        assertEquals("testgroup", dependency.getArtifact().getGroupId());
+        assertEquals("testartifact", dependency.getArtifact().getArtifactId());
+        assertEquals("1.0.0", dependency.getArtifact().getVersion());
+        assertEquals(JavaScopes.COMPILE, dependency.getScope());
+
+        assertEquals(2, dependency.getExclusions().size());
+
+        List<Exclusion> exclusions = Lists.newArrayList(dependency.getExclusions());
+
+        Exclusion exclusion = exclusions.get(0);
+        assertEquals("testgroup", exclusion.getGroupId());
+        assertEquals("testexcartifact", exclusion.getArtifactId());
+        assertEquals(JavaScopes.COMPILE, dependency.getScope());
+
+        exclusion = exclusions.get(1);
+        assertEquals("testgroup", exclusion.getGroupId());
+        assertEquals("*", exclusion.getArtifactId());
+        assertEquals(JavaScopes.COMPILE, dependency.getScope());
+    }
+
+    @Test
+    public void createExclusion() throws Exception {
+        String testExclusion = "group";
+        Exclusion exclusion = AetherUtils.createExclusion(testExclusion);
+
+        assertEquals("group", exclusion.getGroupId());
+        assertEquals("*", exclusion.getArtifactId());
+        assertEquals("*", exclusion.getClassifier());
+        assertEquals("*", exclusion.getExtension());
+
+        testExclusion = "group:artifact";
+        exclusion = AetherUtils.createExclusion(testExclusion);
+
+        assertEquals("group", exclusion.getGroupId());
+        assertEquals("artifact", exclusion.getArtifactId());
+        assertEquals("*", exclusion.getClassifier());
+        assertEquals("*", exclusion.getExtension());
+
+        testExclusion = "group:artifact:site";
+        exclusion = AetherUtils.createExclusion(testExclusion);
+
+        assertEquals("group", exclusion.getGroupId());
+        assertEquals("artifact", exclusion.getArtifactId());
+        assertEquals("site", exclusion.getClassifier());
+        assertEquals("*", exclusion.getExtension());
+
+        testExclusion = "group:artifact:site:jar";
+        exclusion = AetherUtils.createExclusion(testExclusion);
+
+        assertEquals("group", exclusion.getGroupId());
+        assertEquals("artifact", exclusion.getArtifactId());
+        assertEquals("site", exclusion.getClassifier());
+        assertEquals("jar", exclusion.getExtension());
+    }
+
+    @Test
+    public void artifactToString() throws Exception {
+        Artifact testArtifact = new DefaultArtifact("org.apache.storm:storm-core:1.0.0");
+
+        String ret = AetherUtils.artifactToString(testArtifact);
+        assertEquals("org.apache.storm:storm-core:jar:1.0.0", ret);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/DependencyResolverTest.java
----------------------------------------------------------------------
diff --git a/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/DependencyResolverTest.java b/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/DependencyResolverTest.java
new file mode 100644
index 0000000..2a97964
--- /dev/null
+++ b/external/storm-submit-tools/src/test/java/org/apache/storm/submit/dependency/DependencyResolverTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.submit.dependency;
+
+import com.google.common.collect.Lists;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.sonatype.aether.graph.Dependency;
+import org.sonatype.aether.resolution.ArtifactResult;
+import org.sonatype.aether.util.artifact.DefaultArtifact;
+import org.sonatype.aether.util.artifact.JavaScopes;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class DependencyResolverTest {
+    private static Path tempDirForTest;
+
+    private DependencyResolver sut;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        String tempDir = System.getProperty("java.io.tmpdir", ".");
+        tempDirForTest = Files.createTempDirectory(new File(tempDir).toPath(), "dr-test");
+    }
+
+    @Before
+    public void setUp() {
+        sut = new DependencyResolver(tempDirForTest.toAbsolutePath().toString());
+    }
+
+    @Test
+    public void resolveValid() throws Exception {
+        // please pick small artifact which has small transitive dependency
+        // and let's mark as Ignore if we want to run test even without internet or maven central is often not stable
+        Dependency dependency = new Dependency(new DefaultArtifact("org.apache.storm:flux-core:1.0.0"), JavaScopes.COMPILE);
+        List<ArtifactResult> results = sut.resolve(Lists.newArrayList(dependency));
+
+        assertTrue(results.size() > 0);
+        // it should be org.apache.storm:flux-core:jar:1.0.0 and commons-cli:commons-cli:jar:1.2
+        assertContains(results, "org.apache.storm", "flux-core", "1.0.0");
+        assertContains(results, "commons-cli", "commons-cli", "1.2");
+    }
+
+    private void assertContains(List<ArtifactResult> results, String groupId, String artifactId, String version) {
+        for (ArtifactResult result : results) {
+            if (result.getArtifact().getGroupId().equals(groupId) &&
+                    result.getArtifact().getArtifactId().equals(artifactId) &&
+                    result.getArtifact().getVersion().equals(version) &&
+                    result.isResolved()) {
+                return;
+            }
+        }
+
+        throw new AssertionError("Result doesn't contain expected artifact > " + groupId + ":" + artifactId + ":" + version);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index da1f017..a14a7ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,6 +262,10 @@
         <java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include>    <!--maven surefire plugin default test list-->
         <!-- by default the clojure test set are all clojure tests that are not integration tests. This property is overridden in the profiles -->
         <clojure.test.set>!integration.*</clojure.test.set>
+
+        <aetherVersion>1.0.0.v20140518</aetherVersion>
+        <mavenVersion>3.1.0</mavenVersion>
+        <wagonVersion>1.0</wagonVersion>
     </properties>
 
     <modules>
@@ -293,6 +297,7 @@
         <module>external/storm-kafka-monitor</module>
         <module>external/storm-kinesis</module>
         <module>external/storm-druid</module>
+        <module>external/storm-submit-tools</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index 0717a22..1f19bd3 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -141,6 +141,10 @@
   [conf stormconf-path]
   (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))))
 
+(defn read-supervisor-storm-code-given-path
+  [stormcode-path]
+  (Utils/deserialize (FileUtils/readFileToByteArray (File. stormcode-path)) StormTopology))
+
 (defn master-storm-metafile-path [stormroot ]
   (str stormroot file-path-separator "storm-code-distributor.meta"))
 
@@ -224,8 +228,7 @@
   [conf storm-id]
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         topology-path (supervisor-stormcode-path stormroot)]
-    (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology)
-    ))
+    (read-supervisor-storm-code-given-path topology-path)))
 
 (defn worker-user-root [conf]
   (str (absolute-storm-local-dir conf) "/workers-users"))

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index cb83716..003cb55 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -128,7 +128,8 @@
               (str "Duplicate component ids: " offending))))
     (doseq [f thrift/STORM-TOPOLOGY-FIELDS
             :let [obj-map (.getFieldValue topology f)]]
-      (if-not (ThriftTopologyUtils/isWorkerHook f)
+      (if-not (or (ThriftTopologyUtils/isWorkerHook f)
+                   (ThriftTopologyUtils/isDependencies f))
         (do
           (doseq [id (keys obj-map)]
             (if (Utils/isSystemId id)
@@ -143,7 +144,8 @@
 (defn all-components [^StormTopology topology]
   (apply merge {}
     (for [f thrift/STORM-TOPOLOGY-FIELDS]
-      (if-not (ThriftTopologyUtils/isWorkerHook f)
+      (if-not (or (ThriftTopologyUtils/isWorkerHook f)
+                   (ThriftTopologyUtils/isDependencies f))
         (.getFieldValue topology f)))))
 
 (defn component-conf [component]

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0533131..719e4bf 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -29,7 +29,7 @@
   (:import [java.nio ByteBuffer]
            [java.util Collections List HashMap ArrayList Iterator])
   (:import [org.apache.storm.blobstore AtomicOutputStream BlobStoreAclHandler
-            InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer])
+                                       InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer BlobStoreUtils])
   (:import [java.io File FileOutputStream FileInputStream])
   (:import [java.net InetAddress ServerSocket BindException])
   (:import [java.nio.channels Channels WritableByteChannel])
@@ -1217,6 +1217,17 @@
     (catch Exception e
       (log-message "Exception" e))))
 
+(defn blob-rm-dependency-jars-in-topology [id blob-store storm-cluster-state]
+  (try
+    (let [storm-topology (read-storm-topology-as-nimbus id blob-store)
+          dependency-jars (.get_dependency_jars ^StormTopology storm-topology)]
+      (log-message "Removing dependency jars from blobs - " dependency-jars)
+      (when-not (empty? dependency-jars)
+        (doseq [key dependency-jars]
+          (blob-rm-key blob-store key storm-cluster-state))))
+    (catch Exception e
+      (log-message "Exception" e))))
+
 (defn blob-rm-topology-keys [id blob-store storm-cluster-state]
   (blob-rm-key blob-store (master-stormjar-key id) storm-cluster-state)
   (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
@@ -1239,6 +1250,7 @@
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
             (.remove-backpressure! storm-cluster-state id)
+            (blob-rm-dependency-jars-in-topology id blob-store storm-cluster-state)
             (force-delete-topo-dist-dir conf id)
             (blob-rm-topology-keys id blob-store storm-cluster-state)
             (swap! (:heartbeats-cache nimbus) dissoc id)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 64e4650..2888258 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -28,7 +28,7 @@
   (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
   (:import [java.nio.file Files StandardCopyOption])
   (:import [org.apache.storm Config])
-  (:import [org.apache.storm.generated WorkerResources ProfileAction])
+  (:import [org.apache.storm.generated WorkerResources ProfileAction StormTopology])
   (:import [org.apache.storm.localizer LocalResource])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.command [healthcheck :as healthcheck]])
@@ -875,14 +875,13 @@
     (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path))))
 
 (defn download-blobs-for-topology!
-  "Download all blobs listed in the topology configuration for a given topology."
-  [conf stormconf-path localizer tmproot]
-  (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
-        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+  "Download all blobs passed by parameters for a given topology."
+  [conf storm-conf localizer tmproot blobs fn-local-resources fn-symlink-name]
+  (let [
         user (storm-conf TOPOLOGY-SUBMITTER-USER)
         topo-name (storm-conf TOPOLOGY-NAME)
         user-dir (.getLocalUserFileCacheDir localizer user)
-        localresources (blobstore-map-to-localresources blobstore-map)]
+        localresources (fn-local-resources blobs)]
     (when localresources
       (when-not (.exists user-dir)
         (FileUtils/forceMkdir user-dir))
@@ -893,14 +892,34 @@
             (let [rsrc-file-path (File. (.getFilePath local-rsrc))
                   key-name (.getName rsrc-file-path)
                   blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc)))
-                  symlink-name (get-blob-localname (get blobstore-map key-name) key-name)]
+                  symlink-name (fn-symlink-name blobs key-name)]
               (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name
-                blob-symlink-target-name))))
+                               blob-symlink-target-name))))
         (catch AuthorizationException authExp
           (log-error authExp))
         (catch KeyNotFoundException knf
           (log-error knf))))))
 
+(defn download-blobs-in-blobstore-map-for-topology!
+  "Download all blobs listed in the topology configuration for a given topology."
+  [conf stormconf-path localizer tmproot]
+  (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)]
+    (download-blobs-for-topology! conf storm-conf localizer tmproot blobstore-map
+                                  (fn [blobs] (blobstore-map-to-localresources blobs))
+                                  (fn [blobs key-name] (get-blob-localname (get blobs key-name) key-name)))))
+
+(defn download-dependencies-for-topology!
+  "Download all dependencies blobs listed in the topology configuration for a given topology."
+  [conf stormconf-path stormcode-path localizer tmproot]
+  (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+        storm-code (read-supervisor-storm-code-given-path stormcode-path)
+        dependencies (concat (.get_dependency_jars ^StormTopology storm-code)
+                             (.get_dependency_artifacts ^StormTopology storm-code))]
+    (download-blobs-for-topology! conf storm-conf localizer tmproot dependencies
+                                  (fn [blobs] (map #(LocalResource. % false) blobs))
+                                  (fn [_ key-name] key-name))))
+
 (defn get-blob-file-names
   [blobstore-map]
   (if blobstore-map
@@ -909,13 +928,26 @@
 
 (defn download-blobs-for-topology-succeed?
   "Assert if all blobs are downloaded for the given topology"
+  [target-dir file-names]
+  (if-not (empty? file-names)
+    (every? #(Utils/checkFileExists target-dir %) file-names)
+    true))
+
+(defn download-blobs-in-blobstore-map-for-topology-succeed?
+  "Assert if all blobs in blobstore map are downloaded for the given topology"
   [stormconf-path target-dir]
   (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         file-names (get-blob-file-names blobstore-map)]
-    (if-not (empty? file-names)
-      (every? #(Utils/checkFileExists target-dir %) file-names)
-      true)))
+    (download-blobs-for-topology-succeed? target-dir file-names)))
+
+(defn download-dependencies-for-topology-succeed?
+  "Assert if all dependencies blobs are downloaded for the given topology"
+  [stormcode-path target-dir]
+  (let [storm-code (read-supervisor-storm-code-given-path stormcode-path)
+        file-names (concat (.get_dependency_jars ^StormTopology storm-code)
+                           (.get_dependency_artifacts ^StormTopology storm-code))]
+    (download-blobs-for-topology-succeed? target-dir file-names)))
 
 ;; distributed implementation
 (defmethod download-storm-code
@@ -937,9 +969,11 @@
       (supervisor-stormconf-path tmproot) blobstore)
     (.shutdown blobstore)
     (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
-    (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
+    (download-blobs-in-blobstore-map-for-topology! conf (supervisor-stormconf-path tmproot) localizer
       tmproot)
-    (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
+    (download-dependencies-for-topology! conf (supervisor-stormconf-path tmproot) (supervisor-stormcode-path tmproot) localizer tmproot)
+    (if (and (download-blobs-in-blobstore-map-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
+             (download-dependencies-for-topology-succeed? (supervisor-stormcode-path tmproot) tmproot))
       (do
         (log-message "Successfully downloaded blob resources for storm-id " storm-id)
         (if on-windows?
@@ -1051,12 +1085,17 @@
           stormroot (supervisor-stormdist-root conf storm-id)
           jlp (jlp stormroot conf)
           stormjar (supervisor-stormjar-path stormroot)
+          storm-topology (read-supervisor-topology conf storm-id)
+          dependencies (concat (.get_dependency_jars ^StormTopology storm-topology)
+                               (.get_dependency_artifacts ^StormTopology storm-topology))
+          dependency-locations (map #(File. stormroot %) dependencies)
           storm-conf (read-supervisor-storm-conf conf storm-id)
           topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
                            [cp]
                            [])
           classpath (-> (worker-classpath)
                         (add-to-classpath [stormjar])
+                        (add-to-classpath dependency-locations)
                         (add-to-classpath topo-classpath))
           top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
           mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index 2232845..53771fd 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -19,11 +19,15 @@ package org.apache.storm;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.storm.dependency.DependencyPropertiesParser;
+import org.apache.storm.dependency.DependencyUploader;
 import org.apache.storm.hooks.SubmitterHookException;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.validation.ConfigValidation;
@@ -233,22 +237,34 @@ public class StormSubmitter {
                 if(topologyNameExists(conf, name, asUser)) {
                     throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                 }
-                String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);
-                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)){
-                    LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
-                    if (opts != null) {
-                        client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
-                    } else {
-                        // this is for backwards compatibility
-                        client.getClient().submitTopology(name, jar, serConf, topology);
-                    }
-                    LOG.info("Finished submitting topology: " + name);
-                } catch (InvalidTopologyException e) {
-                    LOG.warn("Topology submission exception: " + e.get_msg());
+
+                // Dependency uploading only makes sense for distributed mode
+                List<String> jarsBlobKeys = Collections.emptyList();
+                List<String> artifactsBlobKeys;
+
+                DependencyUploader uploader = new DependencyUploader();
+                try {
+                    uploader.init();
+                    jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
+                    artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
+                } catch (Throwable e) {
+                    // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+                    uploader.deleteBlobs(jarsBlobKeys);
+                    uploader.shutdown();
                     throw e;
-                } catch (AlreadyAliveException e) {
-                    LOG.warn("Topology already alive exception", e);
+                }
+
+                try {
+                    setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
+                    submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf);
+                } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+                    // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+                    // Note that we don't handle TException to delete jars blobs
+                    // because it's safer to leave some blobs instead of topology not running
+                    uploader.deleteBlobs(jarsBlobKeys);
                     throw e;
+                } finally {
+                    uploader.shutdown();
                 }
             }
         } catch(TException e) {
@@ -258,6 +274,64 @@ public class StormSubmitter {
 
     }
 
+    private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
+        LOG.info("Uploading dependencies - jars...");
+
+        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
+
+        String depJarsProp = System.getProperty("storm.dependency.jars", "");
+        List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);
+
+        try {
+            return uploader.uploadFiles(depJars, true);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
+        LOG.info("Uploading dependencies - artifacts...");
+
+        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
+
+        String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}");
+        Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);
+
+        try {
+            return uploader.uploadArtifacts(depArtifacts);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void setDependencyBlobsToTopology(StormTopology topology, List<String> jarsBlobKeys, List<String> artifactsBlobKeys) {
+        LOG.info("Dependency Blob keys - jars : {} / artifacts : {}", jarsBlobKeys, artifactsBlobKeys);
+        topology.set_dependency_jars(jarsBlobKeys);
+        topology.set_dependency_artifacts(artifactsBlobKeys);
+    }
+
+    private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
+                                                       ProgressListener progressListener, String asUser, Map conf,
+                                                       String serConf) throws TException {
+        String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)){
+            LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+            if (opts != null) {
+                client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
+            } else {
+                // this is for backwards compatibility
+                client.getClient().submitTopology(name, jar, serConf, topology);
+            }
+            LOG.info("Finished submitting topology: " + name);
+        } catch (InvalidTopologyException e) {
+            LOG.warn("Topology submission exception: " + e.get_msg());
+            throw e;
+        } catch (AlreadyAliveException e) {
+            LOG.warn("Topology already alive exception", e);
+            throw e;
+        }
+    }
+
     /**
      *
      * @param name

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index 071c4f5..a1d3f5a 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.blobstore;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
@@ -40,9 +41,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 public class BlobStoreUtils {
     private static final String BLOBSTORE_SUBTREE="/blobstore";
+    private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
     private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
 
     public static CuratorFramework createZKClient(Map conf) {
@@ -254,4 +257,20 @@ public class BlobStoreUtils {
         }
     }
 
+    public static String generateDependencyBlobKey(String key) {
+        return BLOB_DEPENDENCIES_PREFIX + key;
+    }
+
+    public static String applyUUIDToFileName(String fileName) {
+        String fileNameWithExt = com.google.common.io.Files.getNameWithoutExtension(fileName);
+        String ext = com.google.common.io.Files.getFileExtension(fileName);
+        if (StringUtils.isEmpty(ext)) {
+            fileName = fileName + "-" + UUID.randomUUID();
+        } else {
+            fileName = fileNameWithExt + "-" + UUID.randomUUID() + "." + ext;
+        }
+        return fileName;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java b/storm-core/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
new file mode 100644
index 0000000..d360ae0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DependencyPropertiesParser {
+    public List<File> parseJarsProperties(String prop) {
+        if (prop.trim().isEmpty()) {
+            // handle no input
+            return Collections.emptyList();
+        }
+
+        List<String> dependencies = Arrays.asList(prop.split(","));
+        return Lists.transform(dependencies, new Function<String, File>() {
+            @Override
+            public File apply(String filePath) {
+                return new File(filePath);
+            }
+        });
+    }
+
+    public Map<String, File> parseArtifactsProperties(String prop) {
+        try {
+            Map<String, String> parsed = (Map<String, String>) JSONValue.parseWithException(prop);
+            Map<String, File> packages = new LinkedHashMap<>(parsed.size());
+            for (Map.Entry<String, String> artifactToFilePath : parsed.entrySet()) {
+                packages.put(artifactToFilePath.getKey(), new File(artifactToFilePath.getValue()));
+            }
+
+            return packages;
+        } catch (ParseException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
new file mode 100644
index 0000000..4f71f67
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStoreUtils;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class DependencyUploader {
+    public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class);
+
+    private final Map<String, Object> conf;
+    private ClientBlobStore blobStore;
+
+    public DependencyUploader() {
+        conf = Utils.readStormConfig();
+    }
+
+    public void init() {
+        if (blobStore == null) {
+            blobStore = Utils.getClientBlobStore(conf);
+        }
+    }
+
+    public void shutdown() {
+        if (blobStore != null) {
+            blobStore.shutdown();
+        }
+    }
+
+    @VisibleForTesting
+    void setBlobStore(ClientBlobStore blobStore) {
+        this.blobStore = blobStore;
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
+        checkFilesExist(dependencies);
+
+        List<String> keys = new ArrayList<>(dependencies.size());
+        try {
+            for (File dependency : dependencies) {
+                String fileName = dependency.getName();
+                String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));
+
+                try {
+                    uploadDependencyToBlobStore(key, dependency);
+                } catch (KeyAlreadyExistsException e) {
+                    // it should never happened since we apply UUID
+                    throw new RuntimeException(e);
+                }
+
+                keys.add(key);
+            }
+        } catch (Throwable e) {
+            if (blobStore != null && cleanupIfFails) {
+                deleteBlobs(keys);
+            }
+            throw new RuntimeException(e);
+        }
+
+        return keys;
+    }
+
+    public List<String> uploadArtifacts(Map<String, File> artifacts) {
+        checkFilesExist(artifacts.values());
+
+        List<String> keys = new ArrayList<>(artifacts.size());
+        try {
+            for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
+                String artifact = artifactToFile.getKey();
+                File dependency = artifactToFile.getValue();
+
+                String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
+                try {
+                    uploadDependencyToBlobStore(key, dependency);
+                } catch (KeyAlreadyExistsException e) {
+                    // we lose the race, but it doesn't matter
+                }
+
+                keys.add(key);
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+
+        return keys;
+    }
+
+    public void deleteBlobs(List<String> keys) {
+        for (String key : keys) {
+            try {
+                blobStore.deleteBlob(key);
+            } catch (Throwable e) {
+                LOG.warn("blob delete failed - key: {} continue...", key);
+            }
+        }
+    }
+
+    private String convertArtifactToJarFileName(String artifact) {
+        return artifact.replace(":", "-") + ".jar";
+    }
+
+    private boolean uploadDependencyToBlobStore(String key, File dependency)
+            throws KeyAlreadyExistsException, AuthorizationException, IOException {
+
+        boolean uploadNew = false;
+        try {
+            // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
+            // as a workaround, we call getBlobMeta() for all keys
+            blobStore.getBlobMeta(key);
+        } catch (KeyNotFoundException e) {
+            // TODO: do we want to add ACL here?
+            AtomicOutputStream blob = blobStore
+                    .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
+            Files.copy(dependency.toPath(), blob);
+            blob.close();
+
+            uploadNew = true;
+        }
+
+        return uploadNew;
+    }
+
+    private void checkFilesExist(Collection<File> dependencies) {
+        for (File dependency : dependencies) {
+            if (!dependency.isFile() || !dependency.exists()) {
+                throw new FileNotAvailableException(dependency.getAbsolutePath());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0b9a4d03/storm-core/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java b/storm-core/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
new file mode 100644
index 0000000..874485b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.dependency;
+
+public class FileNotAvailableException extends RuntimeException {
+    public FileNotAvailableException(String fileName) {
+        super(createMessage(fileName));
+    }
+
+    public FileNotAvailableException(String fileName, Throwable cause) {
+        super(createMessage(fileName), cause);
+    }
+
+    private static String createMessage(String fileName) {
+        return "This file is not available: " + fileName;
+    }
+}