You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:26 UTC

[48/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Constants.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/Constants.java b/jstorm-core/src/main/java/backtype/storm/Constants.java
index 2797b69..da2d5b7 100755
--- a/jstorm-core/src/main/java/backtype/storm/Constants.java
+++ b/jstorm-core/src/main/java/backtype/storm/Constants.java
@@ -20,9 +20,8 @@ package backtype.storm;
 import backtype.storm.coordination.CoordinatedBolt;
 import clojure.lang.RT;
 
-
 public class Constants {
-    public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; 
+    public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
 
     public static final long SYSTEM_TASK_ID = -1;
     public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
@@ -32,6 +31,6 @@ public class Constants {
     public static final String METRICS_STREAM_ID = "__metrics";
     public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
     public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
-    
+
     public static final String JSTORM_CONF_DIR = "JSTORM_CONF_DIR";
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java b/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
index 9319ce1..88adfb4 100755
--- a/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
+++ b/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
@@ -108,94 +108,92 @@ import org.yaml.snakeyaml.Yaml;
 
 public class GenericOptionsParser {
     static final Logger LOG = LoggerFactory.getLogger(GenericOptionsParser.class);
-    
+
     static final Charset UTF8 = Charset.forName("UTF-8");
-    
+
     public static final String TOPOLOGY_LIB_PATH = "topology.lib.path";
-    
+
     public static final String TOPOLOGY_LIB_NAME = "topology.lib.name";
-    
+
     Config conf;
-    
+
     CommandLine commandLine;
-    
+
     // Order in this map is important for these purposes:
     // - configuration priority
     static final LinkedHashMap<String, OptionProcessor> optionProcessors = new LinkedHashMap<String, OptionProcessor>();
-    
+
     public GenericOptionsParser(Config conf, String[] args) throws ParseException {
         this(conf, new Options(), args);
     }
-    
+
     public GenericOptionsParser(Config conf, Options options, String[] args) throws ParseException {
         this.conf = conf;
         parseGeneralOptions(options, conf, args);
     }
-    
+
     public String[] getRemainingArgs() {
         return commandLine.getArgs();
     }
-    
+
     public Config getConfiguration() {
         return conf;
     }
-    
+
     static Options buildGeneralOptions(Options opts) {
         Options r = new Options();
-        
+
         for (Object o : opts.getOptions())
             r.addOption((Option) o);
-        
-        Option libjars = OptionBuilder.withArgName("paths").hasArg().withDescription("comma separated jars to be used by the submitted topology").create("libjars");
+
+        Option libjars =
+                OptionBuilder.withArgName("paths").hasArg().withDescription("comma separated jars to be used by the submitted topology").create("libjars");
         r.addOption(libjars);
         optionProcessors.put("libjars", new LibjarsProcessor());
-        
+
         Option conf = OptionBuilder.withArgName("configuration file").hasArg().withDescription("an application configuration file").create("conf");
         r.addOption(conf);
         optionProcessors.put("conf", new ConfFileProcessor());
-        
+
         // Must come after `conf': this option is of higher priority
         Option extraConfig = OptionBuilder.withArgName("D").hasArg().withDescription("extra configurations (preserving types)").create("D");
         r.addOption(extraConfig);
         optionProcessors.put("D", new ExtraConfigProcessor());
-        
+
         return r;
     }
-    
+
     void parseGeneralOptions(Options opts, Config conf, String[] args) throws ParseException {
         opts = buildGeneralOptions(opts);
         CommandLineParser parser = new GnuParser();
         commandLine = parser.parse(opts, args, true);
         processGeneralOptions(conf, commandLine);
     }
-    
+
     void processGeneralOptions(Config conf, CommandLine commandLine) throws ParseException {
         for (Map.Entry<String, OptionProcessor> e : optionProcessors.entrySet())
             if (commandLine.hasOption(e.getKey()))
                 e.getValue().process(conf, commandLine);
     }
-    
+
     static List<File> validateFiles(String pathList) throws IOException {
         List<File> l = new ArrayList<File>();
-        
+
         for (String s : pathList.split(",")) {
             File file = new File(s);
             if (!file.exists())
                 throw new FileNotFoundException("File `" + file.getAbsolutePath() + "' does not exist");
-            
+
             l.add(file);
         }
-        
+
         return l;
     }
-    
+
     public static void printGenericCommandUsage(PrintStream out) {
         String[] strs =
-                new String[] {
-                        "Generic options supported are",
-                        "  -conf <conf.xml>                            load configurations from",
-                        "                                              <conf.xml>",
-                        "  -conf <conf.yaml>                           load configurations from",
+                new String[] { "Generic options supported are", "  -conf <conf.xml>                            load configurations from",
+                        "                                              <conf.xml>", "  -conf <conf.yaml>                           load configurations from",
                         "                                              <conf.yaml>",
                         "  -D <key>=<value>                            set <key> in configuration",
                         "                                              to <value> (preserve value's type)",
@@ -205,11 +203,11 @@ public class GenericOptionsParser {
         for (String s : strs)
             out.println(s);
     }
-    
+
     static interface OptionProcessor {
         public void process(Config conf, CommandLine commandLine) throws ParseException;
     }
-    
+
     static class LibjarsProcessor implements OptionProcessor {
         @Override
         public void process(Config conf, CommandLine commandLine) throws ParseException {
@@ -223,31 +221,31 @@ public class GenericOptionsParser {
                 }
                 conf.put(TOPOLOGY_LIB_PATH, jars);
                 conf.put(TOPOLOGY_LIB_NAME, names);
-                
+
             } catch (IOException e) {
                 throw new ParseException(e.getMessage());
             }
         }
     }
-    
+
     static class ExtraConfigProcessor implements OptionProcessor {
         static final Yaml yaml = new Yaml();
-        
+
         @Override
         public void process(Config conf, CommandLine commandLine) throws ParseException {
             for (String s : commandLine.getOptionValues("D")) {
                 String[] keyval = s.split("=", 2);
                 if (keyval.length != 2)
                     throw new ParseException("Invalid option value `" + s + "'");
-                
+
                 conf.putAll((Map) yaml.load(keyval[0] + ": " + keyval[1]));
             }
         }
     }
-    
+
     static class ConfFileProcessor implements OptionProcessor {
         static final Yaml yaml = new Yaml();
-        
+
         static Map loadYamlConf(String f) throws IOException {
             InputStreamReader reader = null;
             try {
@@ -259,13 +257,13 @@ public class GenericOptionsParser {
                     reader.close();
             }
         }
-        
+
         static Map loadConf(String f) throws IOException {
             if (f.endsWith(".yaml"))
                 return loadYamlConf(f);
             throw new IOException("Unknown configuration file type: " + f + " does not end with either .yaml");
         }
-        
+
         @Override
         public void process(Config conf, CommandLine commandLine) throws ParseException {
             try {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java b/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
index 1a7bc1b..f8f9e9b 100755
--- a/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
+++ b/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
@@ -26,7 +26,8 @@ import java.util.Map;
 public interface ICredentialsListener {
     /**
      * Called when the credentials of a topology have changed.
+     * 
      * @param credentials the new credentials, could be null.
      */
-    public void setCredentials(Map<String,String> credentials);
+    public void setCredentials(Map<String, String> credentials);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java b/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
index 7d5aa35..7d31f07 100755
--- a/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
+++ b/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
@@ -30,20 +30,33 @@ import backtype.storm.generated.Credentials;
 
 import java.util.Map;
 
-
 public interface ILocalCluster {
     void submitTopology(String topologyName, Map conf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException;
-    void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, InvalidTopologyException;
+
+    void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException,
+            InvalidTopologyException;
+
     void uploadNewCredentials(String topologyName, Credentials creds);
+
     void killTopology(String topologyName) throws NotAliveException;
+
     void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException;
+
     void activate(String topologyName) throws NotAliveException;
+
     void deactivate(String topologyName) throws NotAliveException;
+
     void rebalance(String name, RebalanceOptions options) throws NotAliveException;
+
     void shutdown();
+
     String getTopologyConf(String id);
+
     StormTopology getTopology(String id);
+
     ClusterSummary getClusterInfo();
+
     TopologyInfo getTopologyInfo(String id);
+
     Map getState();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java b/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
index e478dca..4482ecd 100755
--- a/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
+++ b/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
@@ -21,7 +21,6 @@ import backtype.storm.daemon.Shutdownable;
 import backtype.storm.generated.DistributedRPC;
 import backtype.storm.generated.DistributedRPCInvocations;
 
-
 public interface ILocalDRPC extends DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
-    public String getServiceId();    
+    public String getServiceId();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalCluster.java b/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
index b55bac4..c25c260 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
@@ -17,30 +17,21 @@
  */
 package backtype.storm;
 
-import java.util.Map;
-
+import backtype.storm.generated.*;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.utils.JStormUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.Credentials;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.utils.JStormUtils;
+import java.util.Map;
 
 public class LocalCluster implements ILocalCluster {
-    
+
     public static Logger LOG = LoggerFactory.getLogger(LocalCluster.class);
-    
+
     private LocalClusterMap state;
-    
+
     protected void setLogger() {
         // the code is for log4j
         // boolean needReset = true;
@@ -56,61 +47,62 @@ public class LocalCluster implements ILocalCluster {
         // BasicConfigurator.configure();
         // rootLogger.setLevel(Level.INFO);
         // }
-        
+
     }
-    
+
     // this is easy to debug
     protected static LocalCluster instance = null;
-    
+
     public static LocalCluster getInstance() {
         return instance;
     }
-    
+
     public LocalCluster() {
         synchronized (LocalCluster.class) {
             if (instance != null) {
                 throw new RuntimeException("LocalCluster should be single");
             }
             setLogger();
-            
+
             // fix in zk occur Address family not supported by protocol family:
             // connect
             System.setProperty("java.net.preferIPv4Stack", "true");
-            
+
             this.state = LocalUtils.prepareLocalCluster();
             if (this.state == null)
                 throw new RuntimeException("prepareLocalCluster error");
-            
+
             instance = this;
         }
     }
-    
+
     @Override
     public void submitTopology(String topologyName, Map conf, StormTopology topology) {
         submitTopologyWithOpts(topologyName, conf, topology, null);
     }
-    
+
     @Override
     public void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) {
         // TODO Auto-generated method stub
         if (!Utils.isValidConf(conf))
             throw new RuntimeException("Topology conf is not json-serializable");
         JStormUtils.setLocalMode(true);
-        
+        conf.put(Config.STORM_CLUSTER_MODE, "local");
+
         try {
             if (submitOpts == null) {
                 state.getNimbus().submitTopology(topologyName, null, Utils.to_json(conf), topology);
             } else {
                 state.getNimbus().submitTopologyWithOpts(topologyName, null, Utils.to_json(conf), topology, submitOpts);
             }
-            
+
         } catch (Exception e) {
             // TODO Auto-generated catch block
             LOG.error("Failed to submit topology " + topologyName, e);
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public void killTopology(String topologyName) {
         // TODO Auto-generated method stub
@@ -124,7 +116,7 @@ public class LocalCluster implements ILocalCluster {
             LOG.error("fail to kill Topology " + topologyName, e);
         }
     }
-    
+
     @Override
     public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException {
         // TODO Auto-generated method stub
@@ -136,7 +128,7 @@ public class LocalCluster implements ILocalCluster {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public void activate(String topologyName) {
         // TODO Auto-generated method stub
@@ -148,7 +140,7 @@ public class LocalCluster implements ILocalCluster {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public void deactivate(String topologyName) {
         // TODO Auto-generated method stub
@@ -160,7 +152,7 @@ public class LocalCluster implements ILocalCluster {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public void rebalance(String name, RebalanceOptions options) {
         // TODO Auto-generated method stub
@@ -172,7 +164,7 @@ public class LocalCluster implements ILocalCluster {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public void shutdown() {
         // TODO Auto-generated method stub
@@ -180,8 +172,9 @@ public class LocalCluster implements ILocalCluster {
         // it take 10 seconds to remove topology's node
         JStormUtils.sleepMs(10 * 1000);
         this.state.clean();
+        instance = null;
     }
-    
+
     @Override
     public String getTopologyConf(String id) {
         // TODO Auto-generated method stub
@@ -193,7 +186,7 @@ public class LocalCluster implements ILocalCluster {
         }
         return null;
     }
-    
+
     @Override
     public StormTopology getTopology(String id) {
         // TODO Auto-generated method stub
@@ -208,7 +201,7 @@ public class LocalCluster implements ILocalCluster {
         }
         return null;
     }
-    
+
     @Override
     public ClusterSummary getClusterInfo() {
         // TODO Auto-generated method stub
@@ -220,7 +213,7 @@ public class LocalCluster implements ILocalCluster {
         }
         return null;
     }
-    
+
     @Override
     public TopologyInfo getTopologyInfo(String id) {
         // TODO Auto-generated method stub
@@ -235,7 +228,7 @@ public class LocalCluster implements ILocalCluster {
         }
         return null;
     }
-    
+
     /***
      * You should use getLocalClusterMap() to instead.This function will always return null
      * */
@@ -245,11 +238,11 @@ public class LocalCluster implements ILocalCluster {
         // TODO Auto-generated method stub
         return null;
     }
-    
+
     public LocalClusterMap getLocalClusterMap() {
         return state;
     }
-    
+
     public static void main(String[] args) throws Exception {
         LocalCluster localCluster = null;
         try {
@@ -269,7 +262,7 @@ public class LocalCluster implements ILocalCluster {
         } catch (Exception e) {
             // TODO Auto-generated catch block
             LOG.error("fail to uploadNewCredentials of topologyId: " + topologyName, e);
-        } 
+        }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java b/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
index bd99c76..39f23be 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
@@ -31,83 +31,83 @@ import com.alibaba.jstorm.utils.PathUtils;
 import com.alibaba.jstorm.zk.Factory;
 
 public class LocalClusterMap {
-    
+
     public static Logger LOG = LoggerFactory.getLogger(LocalClusterMap.class);
-    
+
     private NimbusServer nimbusServer;
-    
+
     private ServiceHandler nimbus;
-    
+
     private Factory zookeeper;
-    
+
     private Map conf;
-    
+
     private List<String> tmpDir;
-    
+
     private SupervisorManger supervisor;
-    
+
     public ServiceHandler getNimbus() {
         return nimbus;
     }
-    
+
     public void setNimbus(ServiceHandler nimbus) {
         this.nimbus = nimbus;
     }
-    
+
     public Factory getZookeeper() {
         return zookeeper;
     }
-    
+
     public void setZookeeper(Factory zookeeper) {
         this.zookeeper = zookeeper;
     }
-    
+
     public Map getConf() {
         return conf;
     }
-    
+
     public void setConf(Map conf) {
         this.conf = conf;
     }
-    
+
     public NimbusServer getNimbusServer() {
         return nimbusServer;
     }
-    
+
     public void setNimbusServer(NimbusServer nimbusServer) {
         this.nimbusServer = nimbusServer;
     }
-    
+
     public SupervisorManger getSupervisor() {
         return supervisor;
     }
-    
+
     public void setSupervisor(SupervisorManger supervisor) {
         this.supervisor = supervisor;
     }
-    
+
     public List<String> getTmpDir() {
         return tmpDir;
     }
-    
+
     public void setTmpDir(List<String> tmpDir) {
         this.tmpDir = tmpDir;
     }
-    
+
     public void clean() {
-        
+
         if (supervisor != null) {
             supervisor.ShutdownAllWorkers();
             supervisor.shutdown();
         }
-        
+
         if (nimbusServer != null) {
             nimbusServer.cleanup();
         }
-        
+
         if (zookeeper != null)
             zookeeper.shutdown();
-        
+
         // it will hava a problem:
         // java.io.IOException: Unable to delete file:
         // {TmpPath}\{UUID}\version-2\log.1
@@ -122,5 +122,5 @@ public class LocalClusterMap {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java b/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
index 4113bf4..a838026 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
@@ -28,16 +28,16 @@ import com.alibaba.jstorm.drpc.Drpc;
 
 public class LocalDRPC implements ILocalDRPC {
     private static final Logger LOG = LoggerFactory.getLogger(LocalDRPC.class);
-    
+
     private Drpc handler = new Drpc();
     private Thread thread;
-    
+
     private final String serviceId;
-    
+
     public LocalDRPC() {
-        
+
         thread = new Thread(new Runnable() {
-            
+
             @Override
             public void run() {
                 LOG.info("Begin to init local Drpc");
@@ -51,10 +51,10 @@ public class LocalDRPC implements ILocalDRPC {
             }
         });
         thread.start();
-        
+
         serviceId = ServiceRegistry.registerService(handler);
     }
-    
+
     @Override
     public String execute(String functionName, String funcArgs) {
         // TODO Auto-generated method stub
@@ -65,36 +65,36 @@ public class LocalDRPC implements ILocalDRPC {
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     public void result(String id, String result) throws TException {
         // TODO Auto-generated method stub
         handler.result(id, result);
     }
-    
+
     @Override
     public DRPCRequest fetchRequest(String functionName) throws TException {
         // TODO Auto-generated method stub
         return handler.fetchRequest(functionName);
     }
-    
+
     @Override
     public void failRequest(String id) throws TException {
         // TODO Auto-generated method stub
         handler.failRequest(id);
     }
-    
+
     @Override
     public void shutdown() {
         // TODO Auto-generated method stub
         ServiceRegistry.unregisterService(this.serviceId);
         this.handler.shutdown();
     }
-    
+
     @Override
     public String getServiceId() {
         // TODO Auto-generated method stub
         return serviceId;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalUtils.java b/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
index e32c07e..6e5023b 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
@@ -39,32 +39,32 @@ import com.alibaba.jstorm.zk.Factory;
 import com.alibaba.jstorm.zk.Zookeeper;
 
 public class LocalUtils {
-    
+
     public static Logger LOG = LoggerFactory.getLogger(LocalUtils.class);
-    
+
     public static LocalClusterMap prepareLocalCluster() {
         LocalClusterMap state = new LocalClusterMap();
         try {
             List<String> tmpDirs = new ArrayList();
-            
+
             String zkDir = getTmpDir();
             tmpDirs.add(zkDir);
             Factory zookeeper = startLocalZookeeper(zkDir);
             Map conf = getLocalConf(zookeeper.getZooKeeperServer().getClientPort());
-            
+
             String nimbusDir = getTmpDir();
             tmpDirs.add(nimbusDir);
             Map nimbusConf = deepCopyMap(conf);
             nimbusConf.put(Config.STORM_LOCAL_DIR, nimbusDir);
             NimbusServer instance = new NimbusServer();
-            
+
             Map supervisorConf = deepCopyMap(conf);
             String supervisorDir = getTmpDir();
             tmpDirs.add(supervisorDir);
             supervisorConf.put(Config.STORM_LOCAL_DIR, supervisorDir);
             Supervisor supervisor = new Supervisor();
             IContext context = getLocalContext(supervisorConf);
-            
+
             state.setNimbusServer(instance);
             state.setNimbus(instance.launcherLocalServer(nimbusConf, new DefaultInimbus()));
             state.setZookeeper(zookeeper);
@@ -75,11 +75,11 @@ public class LocalUtils {
         } catch (Exception e) {
             LOG.error("prepare cluster error!", e);
             state.clean();
-            
+
         }
         return null;
     }
-    
+
     private static Factory startLocalZookeeper(String tmpDir) {
         for (int i = 2000; i < 65535; i++) {
             try {
@@ -90,11 +90,11 @@ public class LocalUtils {
         }
         throw new RuntimeException("No port is available to launch an inprocess zookeeper.");
     }
-    
+
     private static String getTmpDir() {
         return System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID();
     }
-    
+
     private static Map getLocalConf(int port) {
         List<String> zkServers = new ArrayList<String>(1);
         zkServers.add("localhost");
@@ -110,7 +110,7 @@ public class LocalUtils {
         ConfigExtension.setTaskCleanupTimeoutSec(conf, 0);
         return conf;
     }
-    
+
     private static IContext getLocalContext(Map conf) {
         if (!(Boolean) conf.get(Config.STORM_LOCAL_MODE_ZMQ)) {
             IContext result = new NettyContext();
@@ -120,7 +120,7 @@ public class LocalUtils {
         }
         return null;
     }
-    
+
     private static Map deepCopyMap(Map map) {
         return new HashMap(map);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java b/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
index 400875e..1666b29 100644
--- a/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
+++ b/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
@@ -17,6 +17,14 @@
  */
 package backtype.storm;
 
+import backtype.storm.generated.*;
+import backtype.storm.utils.BufferFileInputStream;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -24,26 +32,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyAssignException;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
 /**
- * Use this class to submit topologies to run on the Storm cluster. You should
- * run your program with the "storm jar" command from the command-line, and then
- * use this class to submit your topologies.
+ * Use this class to submit topologies to run on the Storm cluster. You should run your program with the "storm jar" command from the command-line, and then use
+ * this class to submit your topologies.
  */
 public class StormSubmitter {
     public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
@@ -55,25 +46,20 @@ public class StormSubmitter {
     }
 
     /**
-     * Submits a topology to run on the cluster. A topology runs forever or
-     * until explicitly killed.
+     * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
      * 
      * 
      * @param name the name of the storm.
      * @param stormConf the topology-specific configuration. See {@link Config}.
      * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already
-     *             running
+     * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      */
-    public static void submitTopology(String name, Map stormConf,
-            StormTopology topology) throws AlreadyAliveException,
-            InvalidTopologyException {
+    public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
         submitTopology(name, stormConf, topology, null);
     }
 
-    public static void submitTopology(String name, Map stormConf,
-            StormTopology topology, SubmitOptions opts, List<File> jarFiles)
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, List<File> jarFiles)
             throws AlreadyAliveException, InvalidTopologyException {
         if (jarFiles == null) {
             jarFiles = new ArrayList<File>();
@@ -83,8 +69,7 @@ public class StormSubmitter {
 
         for (File f : jarFiles) {
             if (!f.exists()) {
-                LOG.info(f.getName() + " is not existed: "
-                        + f.getAbsolutePath());
+                LOG.info(f.getName() + " is not existed: " + f.getAbsolutePath());
                 continue;
             }
             jars.put(f.getName(), f.getAbsolutePath());
@@ -96,32 +81,25 @@ public class StormSubmitter {
         submitTopology(name, stormConf, topology, opts);
     }
 
-    public static void submitTopology(String name, Map stormConf,
-            StormTopology topology, SubmitOptions opts,
-            ProgressListener listener) throws AlreadyAliveException,
-            InvalidTopologyException {
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener listener)
+            throws AlreadyAliveException, InvalidTopologyException {
         submitTopology(name, stormConf, topology, opts);
     }
 
     /**
-     * Submits a topology to run on the cluster. A topology runs forever or
-     * until explicitly killed.
+     * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
      * 
      * 
      * @param name the name of the storm.
      * @param stormConf the topology-specific configuration. See {@link Config}.
      * @param topology the processing to execute.
-     * @param options to manipulate the starting of the topology
-     * @throws AlreadyAliveException if a topology with this name is already
-     *             running
+     * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      */
-    public static void submitTopology(String name, Map stormConf,
-            StormTopology topology, SubmitOptions opts)
-            throws AlreadyAliveException, InvalidTopologyException {
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException,
+            InvalidTopologyException {
         if (!Utils.isValidConf(stormConf)) {
-            throw new IllegalArgumentException(
-                    "Storm conf is not valid. Must be json-serializable");
+            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
         }
         stormConf = new HashMap(stormConf);
         stormConf.putAll(Utils.readCommandLineOpts());
@@ -137,20 +115,16 @@ public class StormSubmitter {
                 NimbusClient client = NimbusClient.getConfiguredClient(conf);
                 try {
                     if (topologyNameExists(client, conf, name)) {
-                        throw new RuntimeException("Topology with name `" + name
-                                + "` already exists on cluster");
+                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                     }
 
                     submitJar(client, conf);
-                    LOG.info("Submitting topology " + name
-                            + " in distributed mode with conf " + serConf);
+                    LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
                     if (opts != null) {
-                        client.getClient().submitTopologyWithOpts(name, path,
-                                serConf, topology, opts);
+                        client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
                     } else {
                         // this is for backwards compatibility
-                        client.getClient().submitTopology(name, path, serConf,
-                                topology);
+                        client.getClient().submitTopology(name, path, serConf, topology);
                     }
                 } finally {
                     client.close();
@@ -173,43 +147,36 @@ public class StormSubmitter {
     }
 
     /**
-     * Submits a topology to run on the cluster with a progress bar. A topology
-     * runs forever or until explicitly killed.
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until explicitly killed.
      * 
      * 
      * @param name the name of the storm.
      * @param stormConf the topology-specific configuration. See {@link Config}.
      * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already
-     *             running
+     * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      * @throws TopologyAssignException
      */
 
-    public static void submitTopologyWithProgressBar(String name,
-            Map stormConf, StormTopology topology)
-            throws AlreadyAliveException, InvalidTopologyException {
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
         submitTopologyWithProgressBar(name, stormConf, topology, null);
     }
 
     /**
-     * Submits a topology to run on the cluster with a progress bar. A topology
-     * runs forever or until explicitly killed.
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until explicitly killed.
      * 
      * 
      * @param name the name of the storm.
      * @param stormConf the topology-specific configuration. See {@link Config}.
      * @param topology the processing to execute.
      * @param opts to manipulate the starting of the topology
-     * @throws AlreadyAliveException if a topology with this name is already
-     *             running
+     * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      * @throws TopologyAssignException
      */
 
-    public static void submitTopologyWithProgressBar(String name,
-            Map stormConf, StormTopology topology, SubmitOptions opts)
-            throws AlreadyAliveException, InvalidTopologyException {
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException,
+            InvalidTopologyException {
 
         /**
          * remove progress bar in jstorm
@@ -218,21 +185,11 @@ public class StormSubmitter {
     }
 
     public static boolean topologyNameExists(NimbusClient client, Map conf, String name) {
-    	if (StringUtils.isBlank(name)) {
-    		throw new RuntimeException("TopologyName is empty");
-    	}
-    	
         try {
-            String topologyId = client.getClient().getTopologyId(name);
-            if (StringUtils.isBlank(topologyId) == false) {
-                return true;
-            }
-            return false;
-
-        } catch (NotAliveException e) {
-            return false;
+            client.getClient().getTopologyInfoByName(name);
+            return true;
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            return false;
         }
     }
 
@@ -246,15 +203,9 @@ public class StormSubmitter {
                 String localJar = System.getProperty("storm.jar");
                 path = client.getClient().beginFileUpload();
                 String[] pathCache = path.split("/");
-                String uploadLocation =
-                        path + "/stormjar-" + pathCache[pathCache.length - 1]
-                                + ".jar";
-                List<String> lib =
-                        (List<String>) conf
-                                .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
-                Map<String, String> libPath =
-                        (Map<String, String>) conf
-                                .get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
+                String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
+                List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+                Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
                 if (lib != null && lib.size() != 0) {
                     for (String libName : lib) {
                         String jarPath = path + "/lib/" + libName;
@@ -265,14 +216,12 @@ public class StormSubmitter {
                 } else {
                     if (localJar == null) {
                         // no lib, no client jar
-                        throw new RuntimeException(
-                                "No client app jar, please upload it");
+                        throw new RuntimeException("No client app jar, please upload it");
                     }
                 }
 
                 if (localJar != null) {
-                    submittedJar =
-                            submitJar(conf, localJar, uploadLocation, client);
+                    submittedJar = submitJar(conf, localJar, uploadLocation, client);
                 } else {
                     // no client jar, but with lib jar
                     client.getClient().finishFileUpload(uploadLocation);
@@ -285,36 +234,29 @@ public class StormSubmitter {
         }
     }
 
-    public static String submitJar(Map conf, String localJar,
-            String uploadLocation, NimbusClient client) {
+    public static String submitJar(Map conf, String localJar, String uploadLocation, NimbusClient client) {
         if (localJar == null) {
-            throw new RuntimeException(
-                    "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
         }
 
         try {
 
-            LOG.info("Uploading topology jar " + localJar
-                    + " to assigned location: " + uploadLocation);
+            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
             int bufferSize = 512 * 1024;
-            Object maxBufSizeObject =
-                    conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE);
+            Object maxBufSizeObject = conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE);
             if (maxBufSizeObject != null) {
                 bufferSize = Utils.getInt(maxBufSizeObject) / 2;
             }
 
-            BufferFileInputStream is =
-                    new BufferFileInputStream(localJar, bufferSize);
+            BufferFileInputStream is = new BufferFileInputStream(localJar, bufferSize);
             while (true) {
                 byte[] toSubmit = is.read();
                 if (toSubmit.length == 0)
                     break;
-                client.getClient().uploadChunk(uploadLocation,
-                        ByteBuffer.wrap(toSubmit));
+                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
             }
             client.getClient().finishFileUpload(uploadLocation);
-            LOG.info("Successfully uploaded topology jar to assigned location: "
-                    + uploadLocation);
+            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
             return uploadLocation;
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -350,8 +292,7 @@ public class StormSubmitter {
          * @param bytesUploaded - number of bytes transferred so far
          * @param totalBytes - total number of bytes of the file
          */
-        public void onProgress(String srcFile, String targetFile,
-                long bytesUploaded, long totalBytes);
+        public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
 
         /**
          * called when the file is uploaded
@@ -360,7 +301,6 @@ public class StormSubmitter {
          * @param targetFile - destination file
          * @param totalBytes - total number of bytes of the file
          */
-        public void onCompleted(String srcFile, String targetFile,
-                long totalBytes);
+        public void onCompleted(String srcFile, String targetFile, long totalBytes);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Tool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/Tool.java b/jstorm-core/src/main/java/backtype/storm/Tool.java
index 6722b24..0dc5e32 100755
--- a/jstorm-core/src/main/java/backtype/storm/Tool.java
+++ b/jstorm-core/src/main/java/backtype/storm/Tool.java
@@ -58,13 +58,13 @@ package backtype.storm;
 
 public abstract class Tool {
     Config config;
-    
+
     public abstract int run(String[] args) throws Exception;
-    
+
     public Config getConf() {
         return config;
     }
-    
+
     public void setConf(Config config) {
         this.config = config;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ToolRunner.java b/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
index 33f5034..d70da41 100755
--- a/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
+++ b/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
@@ -32,6 +32,8 @@ import backtype.storm.utils.Utils;
  * href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a
  * href="{@docRoot}
  * to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot}
+ * to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a
+ * href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot}
  * /backtype/storm/GenericOptionsParser.html#GenericOptions"> generic storm command line arguments</a> and modifies the <code>Config</code> of the
  * <code>Tool</code>. The application-specific options are passed along without being modified.
  * 
@@ -41,21 +43,22 @@ import backtype.storm.utils.Utils;
 
 public class ToolRunner {
     static final Logger LOG = LoggerFactory.getLogger(ToolRunner.class);
-    
+
     public static void run(Tool tool, String[] args) {
         run(tool.getConf(), tool, args);
     }
-    
+
     public static void run(Config conf, Tool tool, String[] args) {
         try {
             if (conf == null) {
                 conf = new Config();
                 conf.putAll(Utils.readStormConfig());
             }
-            
+
             GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+            LOG.info(conf.toString());
             tool.setConf(conf);
-            
+
             System.exit(tool.run(parser.getRemainingArgs()));
         } catch (ParseException e) {
             LOG.error("Error parsing generic options: {}", e.getMessage());

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
index 5de9bde..d3d1d37 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
@@ -36,15 +36,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-
 public class ClojureBolt implements IRichBolt, FinishedCallback {
     Map<String, StreamInfo> _fields;
     List<String> _fnSpec;
     List<String> _confSpec;
     List<Object> _params;
-    
+
     IBolt _bolt;
-    
+
     public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
         _fnSpec = fnSpec;
         _confSpec = confSpec;
@@ -57,21 +56,23 @@ public class ClojureBolt implements IRichBolt, FinishedCallback {
         IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
         try {
             IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(stormConf);
-                add(context);
-                add(collectorMap);
-            }};
-            
+            final Map<Keyword, Object> collectorMap =
+                    new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector,
+                            Keyword.intern(Symbol.create("context")), context });
+            List<Object> args = new ArrayList<Object>() {
+                {
+                    add(stormConf);
+                    add(context);
+                    add(collectorMap);
+                }
+            };
+
             _bolt = (IBolt) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
+            // this is kind of unnecessary for clojure
             try {
                 _bolt.prepare(stormConf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
+            } catch (AbstractMethodError ame) {
+
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -85,16 +86,16 @@ public class ClojureBolt implements IRichBolt, FinishedCallback {
 
     @Override
     public void cleanup() {
-            try {
-                _bolt.cleanup();
-            } catch(AbstractMethodError ame) {
-                
-            }
+        try {
+            _bolt.cleanup();
+        } catch (AbstractMethodError ame) {
+
+        }
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
+        for (String stream : _fields.keySet()) {
             StreamInfo info = _fields.get(stream);
             declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
         }
@@ -102,7 +103,7 @@ public class ClojureBolt implements IRichBolt, FinishedCallback {
 
     @Override
     public void finishedId(Object id) {
-        if(_bolt instanceof FinishedCallback) {
+        if (_bolt instanceof FinishedCallback) {
             ((FinishedCallback) _bolt).finishedId(id);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
index f6422e3..fc231ce 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
@@ -39,37 +39,38 @@ public class ClojureSpout implements IRichSpout {
     List<String> _fnSpec;
     List<String> _confSpec;
     List<Object> _params;
-    
+
     ISpout _spout;
-    
+
     public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
         _fnSpec = fnSpec;
         _confSpec = confSpec;
         _params = params;
         _fields = fields;
     }
-    
 
     @Override
     public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
         IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
         try {
             IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(conf);
-                add(context);
-                add(collectorMap);
-            }};
-            
+            final Map<Keyword, Object> collectorMap =
+                    new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector,
+                            Keyword.intern(Symbol.create("context")), context });
+            List<Object> args = new ArrayList<Object>() {
+                {
+                    add(conf);
+                    add(context);
+                    add(collectorMap);
+                }
+            };
+
             _spout = (ISpout) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
+            // this is kind of unnecessary for clojure
             try {
                 _spout.open(conf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
+            } catch (AbstractMethodError ame) {
+
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -80,8 +81,8 @@ public class ClojureSpout implements IRichSpout {
     public void close() {
         try {
             _spout.close();
-        } catch(AbstractMethodError ame) {
-                
+        } catch (AbstractMethodError ame) {
+
         }
     }
 
@@ -89,8 +90,8 @@ public class ClojureSpout implements IRichSpout {
     public void nextTuple() {
         try {
             _spout.nextTuple();
-        } catch(AbstractMethodError ame) {
-                
+        } catch (AbstractMethodError ame) {
+
         }
 
     }
@@ -99,8 +100,8 @@ public class ClojureSpout implements IRichSpout {
     public void ack(Object msgId) {
         try {
             _spout.ack(msgId);
-        } catch(AbstractMethodError ame) {
-                
+        } catch (AbstractMethodError ame) {
+
         }
 
     }
@@ -109,20 +110,20 @@ public class ClojureSpout implements IRichSpout {
     public void fail(Object msgId) {
         try {
             _spout.fail(msgId);
-        } catch(AbstractMethodError ame) {
-                
+        } catch (AbstractMethodError ame) {
+
         }
 
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
+        for (String stream : _fields.keySet()) {
             StreamInfo info = _fields.get(stream);
             declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
         }
     }
-    
+
     @Override
     public Map<String, Object> getComponentConfiguration() {
         IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
@@ -137,8 +138,8 @@ public class ClojureSpout implements IRichSpout {
     public void activate() {
         try {
             _spout.activate();
-        } catch(AbstractMethodError ame) {
-                
+        } catch (AbstractMethodError ame) {
+
         }
     }
 
@@ -146,8 +147,8 @@ public class ClojureSpout implements IRichSpout {
     public void deactivate() {
         try {
             _spout.deactivate();
-        } catch(AbstractMethodError ame) {
-                
+        } catch (AbstractMethodError ame) {
+
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
index a155008..53136c7 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
@@ -26,20 +26,20 @@ import java.util.Map;
 
 public class RichShellBolt extends ShellBolt implements IRichBolt {
     private Map<String, StreamInfo> _outputs;
-    
+
     public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
         super(command);
         _outputs = outputs;
     }
-    
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
+        for (String stream : _outputs.keySet()) {
             StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
+            if (def.is_direct()) {
                 declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
             } else {
-                declarer.declareStream(stream, new Fields(def.get_output_fields()));                
+                declarer.declareStream(stream, new Fields(def.get_output_fields()));
             }
         }
     }
@@ -47,5 +47,5 @@ public class RichShellBolt extends ShellBolt implements IRichBolt {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return null;
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
index b49fbef..2f7a134 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
@@ -34,9 +34,9 @@ public class RichShellSpout extends ShellSpout implements IRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
+        for (String stream : _outputs.keySet()) {
             StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
+            if (def.is_direct()) {
                 declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
             } else {
                 declarer.declareStream(stream, new Fields(def.get_output_fields()));

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/activate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/activate.java b/jstorm-core/src/main/java/backtype/storm/command/activate.java
index ed12e09..11a0db5 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/activate.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/activate.java
@@ -30,7 +30,7 @@ import backtype.storm.utils.Utils;
  * 
  */
 public class activate {
-    
+
     /**
      * @param args
      */
@@ -39,17 +39,17 @@ public class activate {
         if (args == null || args.length == 0) {
             throw new InvalidParameterException("Should input topology name");
         }
-        
+
         String topologyName = args[0];
-        
+
         NimbusClient client = null;
         try {
-            
+
             Map conf = Utils.readStormConfig();
             client = NimbusClient.getConfiguredClient(conf);
-            
+
             client.getClient().activate(topologyName);
-            
+
             System.out.println("Successfully submit command activate " + topologyName);
         } catch (Exception e) {
             System.out.println(e.getMessage());
@@ -61,5 +61,5 @@ public class activate {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/config_value.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/config_value.java b/jstorm-core/src/main/java/backtype/storm/command/config_value.java
index 868ffdc..dd8812a 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/config_value.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/config_value.java
@@ -30,7 +30,7 @@ import backtype.storm.utils.Utils;
  * 
  */
 public class config_value {
-    
+
     /**
      * @param args
      */
@@ -39,12 +39,12 @@ public class config_value {
         if (args == null || args.length == 0) {
             throw new InvalidParameterException("Should input key name");
         }
-        
+
         String key = args[0];
-        
+
         Map conf = Utils.readStormConfig();
-        
+
         System.out.print("VALUE: " + String.valueOf(conf.get(key)));
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/deactivate.java b/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
index 22ac20d..59e97d6 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
@@ -30,7 +30,7 @@ import backtype.storm.utils.Utils;
  * 
  */
 public class deactivate {
-    
+
     /**
      * @param args
      */
@@ -39,17 +39,17 @@ public class deactivate {
         if (args == null || args.length == 0) {
             throw new InvalidParameterException("Should input topology name");
         }
-        
+
         String topologyName = args[0];
-        
+
         NimbusClient client = null;
         try {
-            
+
             Map conf = Utils.readStormConfig();
             client = NimbusClient.getConfiguredClient(conf);
-            
+
             client.getClient().deactivate(topologyName);
-            
+
             System.out.println("Successfully submit command deactivate " + topologyName);
         } catch (Exception e) {
             System.out.println(e.getMessage());
@@ -61,5 +61,5 @@ public class deactivate {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java b/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
index 4ab3893..bda20e1 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
@@ -17,13 +17,13 @@
  */
 package backtype.storm.command;
 
-import java.security.InvalidParameterException;
-import java.util.Map;
-
 import backtype.storm.generated.KillOptions;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
+import java.security.InvalidParameterException;
+import java.util.Map;
+
 /**
  * Kill topology
  * 
@@ -31,7 +31,7 @@ import backtype.storm.utils.Utils;
  * 
  */
 public class kill_topology {
-    
+
     /**
      * @param args
      */
@@ -40,28 +40,28 @@ public class kill_topology {
         if (args == null || args.length == 0) {
             throw new InvalidParameterException("Should input topology name");
         }
-        
+
         String topologyName = args[0];
-        
+
         NimbusClient client = null;
         try {
-            
+
             Map conf = Utils.readStormConfig();
             client = NimbusClient.getConfiguredClient(conf);
-            
+
             if (args.length == 1) {
-                
+
                 client.getClient().killTopology(topologyName);
             } else {
                 int delaySeconds = Integer.parseInt(args[1]);
-                
+
                 KillOptions options = new KillOptions();
                 options.set_wait_secs(delaySeconds);
-                
+
                 client.getClient().killTopologyWithOpts(topologyName, options);
-                
+
             }
-            
+
             System.out.println("Successfully submit command kill " + topologyName);
         } catch (Exception e) {
             System.out.println(e.getMessage());
@@ -73,5 +73,5 @@ public class kill_topology {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/list.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/list.java b/jstorm-core/src/main/java/backtype/storm/command/list.java
index 3b4efdb..0c6930d 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/list.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/list.java
@@ -33,29 +33,29 @@ import backtype.storm.utils.Utils;
  * 
  */
 public class list {
-    
+
     /**
      * @param args
      */
     public static void main(String[] args) {
-        
+
         NimbusClient client = null;
         try {
-            
+
             Map conf = Utils.readStormConfig();
             client = NimbusClient.getConfiguredClient(conf);
-            
+
             if (args.length > 0 && StringUtils.isBlank(args[0]) == false) {
                 String topologyName = args[0];
                 TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName);
-                
+
                 System.out.println("Successfully get topology info \n" + Utils.toPrettyJsonString(info));
             } else {
                 ClusterSummary clusterSummary = client.getClient().getClusterInfo();
-                
+
                 System.out.println("Successfully get cluster info \n" + Utils.toPrettyJsonString(clusterSummary));
             }
-            
+
         } catch (Exception e) {
             System.out.println(e.getMessage());
             e.printStackTrace();
@@ -66,5 +66,5 @@ public class list {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
index 6607445..a673fcf 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
@@ -17,13 +17,13 @@
  */
 package backtype.storm.command;
 
-import java.util.Map;
-import java.security.InvalidParameterException;
-
 import backtype.storm.generated.MonitorOptions;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
+import java.security.InvalidParameterException;
+import java.util.Map;
+
 /**
  * Monitor topology
  * 
@@ -31,7 +31,7 @@ import backtype.storm.utils.Utils;
  * 
  */
 public class metrics_monitor {
-    
+
     /**
      * @param args
      */
@@ -40,22 +40,22 @@ public class metrics_monitor {
         if (args == null || args.length <= 1) {
             throw new InvalidParameterException("Should input topology name and enable flag");
         }
-        
+
         String topologyName = args[0];
-        
+
         NimbusClient client = null;
         try {
-            
+
             Map conf = Utils.readStormConfig();
             client = NimbusClient.getConfiguredClient(conf);
-            
+
             boolean isEnable = Boolean.valueOf(args[1]).booleanValue();
-            
+
             MonitorOptions options = new MonitorOptions();
             options.set_isEnable(isEnable);
-            
+
             client.getClient().metricMonitor(topologyName, options);
-            
+
             String str = (isEnable) ? "enable" : "disable";
             System.out.println("Successfully submit command to " + str + " the monitor of " + topologyName);
         } catch (Exception e) {
@@ -68,5 +68,5 @@ public class metrics_monitor {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
index f0cf69f..6d08934 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
@@ -17,13 +17,12 @@
  */
 package backtype.storm.command;
 
-import java.security.InvalidParameterException;
-import java.util.Map;
-
 import backtype.storm.generated.RebalanceOptions;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
+import java.util.Map;
+
 /**
  * Active topology
  * 
@@ -32,7 +31,7 @@ import backtype.storm.utils.Utils;
  */
 public class rebalance {
     static final String REASSIGN_FLAG = "-r";
-    
+
     /**
      * @param args
      */
@@ -42,15 +41,15 @@ public class rebalance {
             printErrorInfo();
             return;
         }
-        
+
         int argsIndex = 0;
         String topologyName = null;
-        
+
         try {
             RebalanceOptions options = new RebalanceOptions();
             options.set_reassign(false);
             options.set_conf(null);
-            
+
             if (args[argsIndex].equalsIgnoreCase(REASSIGN_FLAG)) {
                 options.set_reassign(true);
                 argsIndex++;
@@ -64,7 +63,7 @@ public class rebalance {
             } else {
                 topologyName = args[argsIndex];
             }
-            
+
             argsIndex++;
             if (args.length > argsIndex) {
                 for (int i = argsIndex; i < args.length; i++) {
@@ -85,32 +84,34 @@ public class rebalance {
                     }
                 }
             }
-            
+
             submitRebalance(topologyName, options);
-            
-            System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" + options.get_wait_secs() + ", reassignFlag=" + options.is_reassign() + ", newConfiguration=" + options.get_conf());
+
+            System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" +
+                    options.get_wait_secs() + ", reassignFlag="
+                    + options.is_reassign() + ", newConfiguration=" + options.get_conf());
         } catch (Exception e) {
             System.out.println(e.getMessage());
             e.printStackTrace();
             throw new RuntimeException(e);
         }
     }
-    
+
     private static void printErrorInfo() {
         System.out.println("Error: Invalid parameters!");
         System.out.println("USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]");
     }
-    
+
     public static void submitRebalance(String topologyName, RebalanceOptions options) throws Exception {
         submitRebalance(topologyName, options, null);
     }
-    
+
     public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf) throws Exception {
         Map stormConf = Utils.readStormConfig();
         if (conf != null) {
             stormConf.putAll(conf);
         }
-        
+
         NimbusClient client = null;
         try {
             client = NimbusClient.getConfiguredClient(stormConf);
@@ -123,5 +124,5 @@ public class rebalance {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/restart.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/restart.java b/jstorm-core/src/main/java/backtype/storm/command/restart.java
index ecec9a3..5a216ea 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/restart.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/restart.java
@@ -45,26 +45,26 @@ public class restart {
         if (args == null || args.length == 0) {
             throw new InvalidParameterException("Should input topology name");
         }
-        
+
         String topologyName = args[0];
-        
+
         NimbusClient client = null;
         try {
             Map conf = Utils.readStormConfig();
             client = NimbusClient.getConfiguredClient(conf);
-            
+
             System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n");
-            
+
             if (args.length == 1) {
                 client.getClient().restart(topologyName, null);
             } else {
                 Map loadConf = Utils.loadConf(args[1]);
                 String jsonConf = Utils.to_json(loadConf);
                 System.out.println("New configuration:\n" + jsonConf);
-                
+
                 client.getClient().restart(topologyName, jsonConf);
             }
-            
+
             System.out.println("Successfully submit command restart " + topologyName);
         } catch (Exception e) {
             System.out.println(e.getMessage());
@@ -76,5 +76,5 @@ public class restart {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/update_config.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_config.java b/jstorm-core/src/main/java/backtype/storm/command/update_config.java
deleted file mode 100644
index be78f19..0000000
--- a/jstorm-core/src/main/java/backtype/storm/command/update_config.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Update user configuration
- * 
- * @author basti
- * 
- */
-public class update_config {
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-        // TODO Auto-generated method stub
-        if (args == null || args.length < 2) {
-            throw new InvalidParameterException(
-                    "[USAGE] update_config topologyName config");
-        }
-
-        String topologyName = args[0];
-
-        NimbusClient client = null;
-        try {
-            Map conf = Utils.readStormConfig();
-            client = NimbusClient.getConfiguredClient(conf);
-
-            Map loadConf = Utils.loadConf(args[1]);
-            String jsonConf = Utils.to_json(loadConf);
-            System.out.println("New configuration:\n" + jsonConf);
-
-            client.getClient().updateConf(topologyName, jsonConf);
-
-            System.out.println("Successfully submit command update_conf "
-                    + topologyName);
-        } catch (Exception e) {
-            System.out.println(e.getMessage());
-            e.printStackTrace();
-            throw new RuntimeException(e);
-        } finally {
-            if (client != null) {
-                client.close();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/update_topology.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_topology.java b/jstorm-core/src/main/java/backtype/storm/command/update_topology.java
new file mode 100644
index 0000000..85172a7
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/update_topology.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import backtype.storm.GenericOptionsParser;
+import backtype.storm.StormSubmitter;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.commons.cli.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by xiaojian.fxj on 2015/10/13.
+ */
+public class update_topology {
+    public static final String UPDATE_CONF = "-conf";
+
+    public static final String UPDATE_JAR = "-jar";
+
+    public static void usage() {
+        System.out.println("update topology config, please do as following:");
+        System.out.println("update_topology topologyName -conf configFile");
+
+        System.out.println("update topology jar, please do as following:");
+        System.out.println("update_topology topologyName -jar jarFile");
+
+        System.out.println("update topology jar and conf, please do as following:");
+        System.out.println("update_topology topologyName -jar jarFile -conf configFile");
+    }
+
+    private static Options buildGeneralOptions(Options opts) {
+        Options r = new Options();
+
+        for (Object o : opts.getOptions())
+            r.addOption((Option) o);
+
+        Option jar = OptionBuilder.withArgName("path").hasArg()
+                .withDescription("comma  jar of the submitted topology")
+                .create("jar");
+        r.addOption(jar);
+
+        Option conf = OptionBuilder.withArgName("configuration file").hasArg()
+                .withDescription("an application configuration file")
+                .create("conf");
+        r.addOption(conf);
+        return r;
+    }
+
+    private static void updateTopology(String topologyName, String pathJar,
+            String pathConf) {
+        NimbusClient client = null;
+        Map loadMap = null;
+        if (pathConf != null) {
+            loadMap = Utils.loadConf(pathConf);
+        } else {
+            loadMap = new HashMap();
+        }
+
+        Map conf = Utils.readStormConfig();
+
+        conf.putAll(loadMap);
+        client = NimbusClient.getConfiguredClient(conf);
+        try {
+            // update jar
+            String uploadLocation = null;
+            if (pathJar != null) {
+                System.out.println("Jar update to master yet. Submitting jar of " + pathJar);
+                String path = client.getClient().beginFileUpload();
+                String[] pathCache = path.split("/");
+                uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
+                List<String> lib = (List<String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+                Map<String, String> libPath = (Map<String, String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
+                if (lib != null && lib.size() != 0) {
+                    for (String libName : lib) {
+                        String jarPath = path + "/lib/" + libName;
+                        client.getClient().beginLibUpload(jarPath);
+                        StormSubmitter.submitJar(conf, libPath.get(libName), jarPath, client);
+                    }
+
+                } else {
+                    if (pathJar == null) {
+                        // no lib, no client jar
+                        throw new RuntimeException( "No client app jar, please upload it");
+                    }
+                }
+
+                if (pathJar != null) {
+                    StormSubmitter.submitJar(conf, pathJar, uploadLocation, client);
+                } else {
+                    // no client jar, but with lib jar
+                    client.getClient().finishFileUpload(uploadLocation);
+                }
+            }
+
+            // update topology
+            String jsonConf = Utils.to_json(loadMap);
+            System.out.println("New configuration:\n" + jsonConf);
+
+            client.getClient().updateTopology(topologyName, uploadLocation,
+                    jsonConf);
+
+            System.out.println("Successfully submit command update " + topologyName);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+
+    }
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        if (args == null || args.length < 3) {
+            System.out.println("Invalid parameter");
+            usage();
+            return;
+        }
+        String topologyName = args[0];
+        try {
+            String[] str2 = Arrays.copyOfRange(args, 1, args.length);
+            CommandLineParser parser = new GnuParser();
+            Options r = buildGeneralOptions(new Options());
+            CommandLine commandLine = parser.parse(r, str2, true);
+
+            String pathConf = null;
+            String pathJar = null;
+            if (commandLine.hasOption("conf")) {
+                pathConf = (commandLine.getOptionValues("conf"))[0];
+            }
+            if (commandLine.hasOption("jar")) {
+                pathJar = (commandLine.getOptionValues("jar"))[0];
+            }
+            if (pathConf != null || pathJar != null)
+                updateTopology(topologyName, pathJar, pathConf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
index 8653010..d9163e5 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
@@ -32,18 +32,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
-    public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);    
+    public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
 
     byte[] _boltSer;
     Map<Object, IBatchBolt> _openTransactions;
     Map _conf;
     TopologyContext _context;
     BatchOutputCollectorImpl _collector;
-    
+
     public BatchBoltExecutor(IBatchBolt bolt) {
         _boltSer = Utils.javaSerialize(bolt);
     }
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
         _conf = conf;
@@ -57,11 +57,11 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
         Object id = input.getValue(0);
         IBatchBolt bolt = getBatchBolt(id);
         try {
-             bolt.execute(input);
+            bolt.execute(input);
             _collector.ack(input);
-        } catch(FailedException e) {
+        } catch (FailedException e) {
             LOG.error("Failed to process tuple in batch", e);
-            _collector.fail(input);                
+            _collector.fail(input);
         }
     }
 
@@ -78,30 +78,29 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
 
     @Override
     public void timeoutId(Object attempt) {
-        _openTransactions.remove(attempt);        
-    }    
-    
+        _openTransactions.remove(attempt);
+    }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         newTransactionalBolt().declareOutputFields(declarer);
     }
-    
+
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return newTransactionalBolt().getComponentConfiguration();
     }
-    
+
     private IBatchBolt getBatchBolt(Object id) {
         IBatchBolt bolt = _openTransactions.get(id);
-        if(bolt==null) {
+        if (bolt == null) {
             bolt = newTransactionalBolt();
             bolt.prepare(_conf, _context, _collector, id);
-            _openTransactions.put(id, bolt);            
+            _openTransactions.put(id, bolt);
         }
         return bolt;
     }
-    
+
     private IBatchBolt newTransactionalBolt() {
         return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
index f5f3457..0b99339 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
@@ -30,17 +30,16 @@ public abstract class BatchOutputCollector {
     }
 
     public abstract List<Integer> emit(String streamId, List<Object> tuple);
-    
+
     /**
-     * Emits a tuple to the specified task on the default output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message.
+     * Emits a tuple to the specified task on the default output stream. This output stream must have been declared as a direct stream, and the specified task
+     * must use a direct grouping on this stream to receive the message.
      */
     public void emitDirect(int taskId, List<Object> tuple) {
         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
     }
-    
-    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); 
-    
+
+    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
+
     public abstract void reportError(Throwable error);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
index cae7560..44a1f01 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
@@ -23,11 +23,11 @@ import java.util.List;
 
 public class BatchOutputCollectorImpl extends BatchOutputCollector {
     OutputCollector _collector;
-    
+
     public BatchOutputCollectorImpl(OutputCollector collector) {
         _collector = collector;
     }
-    
+
     @Override
     public List<Integer> emit(String streamId, List<Object> tuple) {
         return _collector.emit(streamId, tuple);
@@ -42,11 +42,11 @@ public class BatchOutputCollectorImpl extends BatchOutputCollector {
     public void reportError(Throwable error) {
         _collector.reportError(error);
     }
-    
+
     public void ack(Tuple tup) {
         _collector.ack(tup);
     }
-    
+
     public void fail(Tuple tup) {
         _collector.fail(tup);
     }