You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/12 17:35:43 UTC
[1/8] git commit: Added storm submitter progress bar to see the
progress of the jar file upload
Repository: incubator-storm
Updated Branches:
refs/heads/master b612a8675 -> 9902c9d4e
Added storm submitter progress bar to see the progress of the jar file upload
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/262f2dc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/262f2dc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/262f2dc3
Branch: refs/heads/master
Commit: 262f2dc3cb4212e5e3428f1b2f569114eb3146b3
Parents: 22215b5
Author: darthbear <fr...@gmail.com>
Authored: Sat May 3 13:06:32 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Sat May 3 13:06:32 2014 -0400
----------------------------------------------------------------------
.../jvm/storm/starter/BasicDRPCTopology.java | 3 +-
.../jvm/storm/starter/ExclamationTopology.java | 3 +-
.../src/jvm/storm/starter/ReachTopology.java | 3 +-
.../jvm/storm/starter/WordCountTopology.java | 3 +-
.../storm/starter/trident/TridentWordCount.java | 3 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 92 +++++++++++++++-----
.../storm/StormSubmitterWithProgressBar.java | 76 ++++++++++++++++
7 files changed, 156 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
index b0493a7..af7d238 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -21,6 +21,7 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
+import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -72,7 +73,7 @@ public class BasicDRPCTopology {
}
else {
conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
+ StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
index a0fb757..e1c7622 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
@@ -20,6 +20,7 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
+import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
@@ -73,7 +74,7 @@ public class ExclamationTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
index de99a91..95d2111 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
@@ -21,6 +21,7 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
+import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
@@ -190,7 +191,7 @@ public class ReachTopology {
}
else {
conf.setNumWorkers(6);
- StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
+ StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index b8dece0..553d535 100644
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@ -20,6 +20,7 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
+import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
@@ -91,7 +92,7 @@ public class WordCountTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
index 439e00f..ce8a969 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
@@ -21,6 +21,7 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
+import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
@@ -79,7 +80,7 @@ public class TridentWordCount {
}
else {
conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
+ StormSubmitterWithProgressBar.submitTopology(args[0], conf, buildTopology(null));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 2fd0ce8..90bdca5 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -17,6 +17,7 @@
*/
package backtype.storm;
+import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -65,22 +66,23 @@ public class StormSubmitter {
* @throws InvalidTopologyException if an invalid topology was submitted
*/
public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, null);
+ submitTopology(name, stormConf, topology, null, null);
}
-
+
/**
- * Submits a topology to run on the cluster. A topology runs forever or until
+ * Submits a topology to run on the cluster. A topology runs forever or until
* explicitly killed.
*
*
* @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
- * @param options to manipulate the starting of the topology
+ * @param opts to manipulate the starting of the topology
+ * @param progressListener to track the progress of the jar upload process
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
*/
- public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
@@ -98,14 +100,14 @@ public class StormSubmitter {
if(topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
- submitJar(conf);
+ submitJar(conf, null, progressListener);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
if(opts!=null) {
- client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
+ client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
} else {
// this is for backwards compatibility
- client.getClient().submitTopology(name, submittedJar, serConf, topology);
+ client.getClient().submitTopology(name, submittedJar, serConf, topology);
}
} catch(InvalidTopologyException e) {
LOG.warn("Topology submission exception: "+e.get_msg());
@@ -122,7 +124,7 @@ public class StormSubmitter {
throw new RuntimeException(e);
}
}
-
+
private static boolean topologyNameExists(Map conf, String name) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
@@ -143,32 +145,54 @@ public class StormSubmitter {
private static String submittedJar = null;
- private static void submitJar(Map conf) {
- if(submittedJar==null) {
- LOG.info("Jar not uploaded to master yet. Submitting jar...");
- String localJar = System.getProperty("storm.jar");
- submittedJar = submitJar(conf, localJar);
- } else {
- LOG.info("Jar already uploaded to master. Not submitting jar.");
- }
- }
-
public static String submitJar(Map conf, String localJar) {
- if(localJar==null) {
- throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ return submitJar(conf, localJar, null);
+ }
+
+ public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+
+ if (submittedJar==null && localJar==null) {
+ LOG.info("Jar not uploaded to master yet. Submitting jar...");
+ localJar = System.getProperty("storm.jar");
+
+ if (localJar==null) {
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ }
+ } else if (localJar!=null) {
+ LOG.info("Jar already uploaded to master. Not submitting jar.");
+ return submittedJar;
}
+
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
+
+ long totalSize = new File(localJar).length();
+ if (listener != null) {
+ listener.onStart(localJar, uploadLocation, totalSize);
+ }
+
+ long bytesUploaded = 0;
while(true) {
byte[] toSubmit = is.read();
+ bytesUploaded += toSubmit.length;
+ if (listener != null) {
+ listener.onProgress(bytesUploaded);
+ }
+
if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
+
+ if (listener != null) {
+ listener.onCompleted();
+ }
+
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
+ submittedJar = uploadLocation;
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
@@ -176,4 +200,28 @@ public class StormSubmitter {
client.close();
}
}
+
+ /**
+ * Interface use to track progress of file upload
+ */
+ public interface ProgressListener {
+ /**
+ * called before file is uploaded
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onStart(String srcFile, String targetFile, long totalBytes);
+
+ /**
+ * called whenever a chunk of bytes is uploaded
+ * @param bytesUploaded - number of bytes transferred so far
+ */
+ public void onProgress(long bytesUploaded);
+
+ /**
+ * called when the file is uploaded
+ */
+ public void onCompleted();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/262f2dc3/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java b/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
new file mode 100644
index 0000000..9d58971
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
@@ -0,0 +1,76 @@
+package backtype.storm;
+
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Map;
+
+/**
+ * Submit topology with a progress bar
+ */
+
+public class StormSubmitterWithProgressBar {
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+ // show a progress bar so we know we're not stuck (especially on slow connections)
+ StormSubmitter.submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+ private long totalBytes;
+ private String srcFile;
+ private String targetFile;
+
+ @Override
+ public void onStart(String srcFile, String targetFile, long totalBytes) {
+ this.srcFile = srcFile;
+ this.targetFile = targetFile;
+ this.totalBytes = totalBytes;
+ System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+
+ @Override
+ public void onProgress(long bytesUploaded) {
+ int length = 50;
+ int p = (int)((length * bytesUploaded) / totalBytes);
+ String progress = StringUtils.repeat("=", p);
+ String todo = StringUtils.repeat(" ", length - p);
+
+ System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+ });
+ }
+}
[4/8] git commit: rewrote submitJar methods to be similar to what we
had before
Posted by bo...@apache.org.
rewrote submitJar methods to be similar to what we had before
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/1bc4d41e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/1bc4d41e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/1bc4d41e
Branch: refs/heads/master
Commit: 1bc4d41e8cf348e9cc1121c145d015f3f7351885
Parents: 4e03132
Author: darthbear <fr...@gmail.com>
Authored: Thu May 8 11:11:53 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Thu May 8 11:11:53 2014 -0400
----------------------------------------------------------------------
.../src/jvm/backtype/storm/StormSubmitter.java | 28 ++++++++++----------
1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/1bc4d41e/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 5c644fa..d292944 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -101,7 +101,7 @@ public class StormSubmitter {
if(topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
- submitJar(conf, null, progressListener);
+ submitJar(conf, progressListener);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
if(opts!=null) {
@@ -199,23 +199,24 @@ public class StormSubmitter {
}
private static String submittedJar = null;
-
+
+ private static void submitJar(Map conf, ProgressListener listener) {
+ if(submittedJar==null) {
+ LOG.info("Jar not uploaded to master yet. Submitting jar...");
+ String localJar = System.getProperty("storm.jar");
+ submittedJar = submitJar(conf, localJar, listener);
+ } else {
+ LOG.info("Jar already uploaded to master. Not submitting jar.");
+ }
+ }
+
public static String submitJar(Map conf, String localJar) {
return submitJar(conf, localJar, null);
}
public static String submitJar(Map conf, String localJar, ProgressListener listener) {
-
- if (submittedJar==null && localJar==null) {
- LOG.info("Jar not uploaded to master yet. Submitting jar...");
- localJar = System.getProperty("storm.jar");
-
- if (localJar==null) {
- throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
- }
- } else if (localJar!=null) {
- LOG.info("Jar already uploaded to master. Not submitting jar.");
- return submittedJar;
+ if (localJar == null) {
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}
NimbusClient client = NimbusClient.getConfiguredClient(conf);
@@ -247,7 +248,6 @@ public class StormSubmitter {
}
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
- submittedJar = uploadLocation;
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
[2/8] git commit: moved submitTopology with progress bar
(StormSubmitterWithProgressBar) to StormSubmitter
Posted by bo...@apache.org.
moved submitTopology with progress bar (StormSubmitterWithProgressBar) to StormSubmitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2d2e842e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2d2e842e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2d2e842e
Branch: refs/heads/master
Commit: 2d2e842eeda51bcbb59a4dc1c1d46c746cbeb0a9
Parents: 262f2dc
Author: darthbear <fr...@gmail.com>
Authored: Mon May 5 21:06:39 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Mon May 5 21:06:39 2014 -0400
----------------------------------------------------------------------
.../jvm/storm/starter/BasicDRPCTopology.java | 3 +-
.../jvm/storm/starter/ExclamationTopology.java | 3 +-
.../src/jvm/storm/starter/ReachTopology.java | 3 +-
.../jvm/storm/starter/WordCountTopology.java | 3 +-
.../storm/starter/trident/TridentWordCount.java | 3 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 62 ++++++++++++++++
.../storm/StormSubmitterWithProgressBar.java | 76 --------------------
7 files changed, 67 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
index af7d238..958860b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -73,7 +72,7 @@ public class BasicDRPCTopology {
}
else {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
index e1c7622..d7b1b3e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
@@ -20,7 +20,6 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
@@ -74,7 +73,7 @@ public class ExclamationTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
index 95d2111..2c5c8ba 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
@@ -191,7 +190,7 @@ public class ReachTopology {
}
else {
conf.setNumWorkers(6);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index 553d535..39184da 100644
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@ -20,7 +20,6 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
@@ -92,7 +91,7 @@ public class WordCountTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
index ce8a969..e4a2d2e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
@@ -80,7 +79,7 @@ public class TridentWordCount {
}
else {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, buildTopology(null));
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 90bdca5..ff299ff 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
@@ -125,6 +126,67 @@ public class StormSubmitter {
}
}
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ submitTopologyWithProgressBar(name, stormConf, topology, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+ // show a progress bar so we know we're not stuck (especially on slow connections)
+ submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+ private long totalBytes;
+ private String srcFile;
+ private String targetFile;
+
+ @Override
+ public void onStart(String srcFile, String targetFile, long totalBytes) {
+ this.srcFile = srcFile;
+ this.targetFile = targetFile;
+ this.totalBytes = totalBytes;
+ System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+
+ @Override
+ public void onProgress(long bytesUploaded) {
+ int length = 50;
+ int p = (int)((length * bytesUploaded) / totalBytes);
+ String progress = StringUtils.repeat("=", p);
+ String todo = StringUtils.repeat(" ", length - p);
+
+ System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+ });
+ }
+
private static boolean topologyNameExists(Map conf, String name) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java b/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
deleted file mode 100644
index 9d58971..0000000
--- a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package backtype.storm;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Map;
-
-/**
- * Submit topology with a progress bar
- */
-
-public class StormSubmitterWithProgressBar {
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
-
- public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, null);
- }
-
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @param opts to manipulate the starting of the topology
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
-
- public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
- // show a progress bar so we know we're not stuck (especially on slow connections)
- StormSubmitter.submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
- private long totalBytes;
- private String srcFile;
- private String targetFile;
-
- @Override
- public void onStart(String srcFile, String targetFile, long totalBytes) {
- this.srcFile = srcFile;
- this.targetFile = targetFile;
- this.totalBytes = totalBytes;
- System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
- }
-
- @Override
- public void onProgress(long bytesUploaded) {
- int length = 50;
- int p = (int)((length * bytesUploaded) / totalBytes);
- String progress = StringUtils.repeat("=", p);
- String todo = StringUtils.repeat(" ", length - p);
-
- System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
- }
-
- @Override
- public void onCompleted() {
- System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
- }
- });
- }
-}
[6/8] git commit: fixed some javadoc
Posted by bo...@apache.org.
fixed some javadoc
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d9f33533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d9f33533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d9f33533
Branch: refs/heads/master
Commit: d9f33533f0d55b0e98d8d0290676baced0b70a71
Parents: 877e95f
Author: darthbear <fr...@gmail.com>
Authored: Thu May 8 11:34:51 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Thu May 8 11:34:51 2014 -0400
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/StormSubmitter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d9f33533/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 78c4bd9..5dfb34b 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -214,7 +214,7 @@ public class StormSubmitter {
* Submit jar file
* @param conf the topology-specific configuration. See {@link Config}.
* @param localJar file path of the jar file to submit
- * @return
+ * @return the remote location of the submitted jar
*/
public static String submitJar(Map conf, String localJar) {
return submitJar(conf, localJar, null);
@@ -225,7 +225,7 @@ public class StormSubmitter {
* @param conf the topology-specific configuration. See {@link Config}.
* @param localJar file path of the jar file to submit
* @param listener progress listener to track the jar file upload
- * @return
+ * @return the remote location of the submitted jar
*/
public static String submitJar(Map conf, String localJar, ProgressListener listener) {
if (localJar == null) {
[5/8] git commit: added javadoc and fixed some bad indentation
Posted by bo...@apache.org.
added javadoc and fixed some bad indentation
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/877e95fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/877e95fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/877e95fc
Branch: refs/heads/master
Commit: 877e95fce0ea1859edf2f95c9ae1b4ddf0abb237
Parents: 1bc4d41
Author: darthbear <fr...@gmail.com>
Authored: Thu May 8 11:23:08 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Thu May 8 11:23:08 2014 -0400
----------------------------------------------------------------------
.../src/jvm/storm/starter/BasicDRPCTopology.java | 2 +-
storm-core/src/jvm/backtype/storm/StormSubmitter.java | 13 +++++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/877e95fc/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
index 958860b..1b29738 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -72,7 +72,7 @@ public class BasicDRPCTopology {
}
else {
conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/877e95fc/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index d292944..78c4bd9 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -210,10 +210,23 @@ public class StormSubmitter {
}
}
+ /**
+ * Submit jar file
+ * @param conf the topology-specific configuration. See {@link Config}.
+ * @param localJar file path of the jar file to submit
+ * @return
+ */
public static String submitJar(Map conf, String localJar) {
return submitJar(conf, localJar, null);
}
+ /**
+ * Submit jar file
+ * @param conf the topology-specific configuration. See {@link Config}.
+ * @param localJar file path of the jar file to submit
+ * @param listener progress listener to track the jar file upload
+ * @return
+ */
public static String submitJar(Map conf, String localJar, ProgressListener listener) {
if (localJar == null) {
throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
[8/8] git commit: Updated Changelog for STORM-315
Posted by bo...@apache.org.
Updated Changelog for STORM-315
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9902c9d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9902c9d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9902c9d4
Branch: refs/heads/master
Commit: 9902c9d4e9141203b72e689c3bd73da86be7d1d1
Parents: 35b288e
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Mon May 12 15:34:57 2014 +0000
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Mon May 12 15:34:57 2014 +0000
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9902c9d4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b308a26..fa27264 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.9.2-incubating (unreleased)
+ * STORM-315: Added progress bar when submitting topology
* STORM-214: Windows: storm.cmd does not properly handle multiple -c arguments
* STORM-306: Add security documentation
* STORM-302: Fix Indentation for pom.xml in storm-dist
[3/8] git commit: added src,
target file and totalBytes to progress bar listener
Posted by bo...@apache.org.
added src, target file and totalBytes to progress bar listener
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4e031323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4e031323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4e031323
Branch: refs/heads/master
Commit: 4e0313236ddf4d5d09a1b01f811f923923d52639
Parents: 2d2e842
Author: darthbear <fr...@gmail.com>
Authored: Tue May 6 11:03:03 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Tue May 6 11:03:03 2014 -0400
----------------------------------------------------------------------
.../src/jvm/backtype/storm/StormSubmitter.java | 25 ++++++++++----------
1 file changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4e031323/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index ff299ff..5c644fa 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -158,20 +158,13 @@ public class StormSubmitter {
public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
// show a progress bar so we know we're not stuck (especially on slow connections)
submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
- private long totalBytes;
- private String srcFile;
- private String targetFile;
-
@Override
public void onStart(String srcFile, String targetFile, long totalBytes) {
- this.srcFile = srcFile;
- this.targetFile = targetFile;
- this.totalBytes = totalBytes;
System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
}
@Override
- public void onProgress(long bytesUploaded) {
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
int length = 50;
int p = (int)((length * bytesUploaded) / totalBytes);
String progress = StringUtils.repeat("=", p);
@@ -181,7 +174,7 @@ public class StormSubmitter {
}
@Override
- public void onCompleted() {
+ public void onCompleted(String srcFile, String targetFile, long totalBytes) {
System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
}
});
@@ -241,7 +234,7 @@ public class StormSubmitter {
byte[] toSubmit = is.read();
bytesUploaded += toSubmit.length;
if (listener != null) {
- listener.onProgress(bytesUploaded);
+ listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
}
if(toSubmit.length==0) break;
@@ -250,7 +243,7 @@ public class StormSubmitter {
client.getClient().finishFileUpload(uploadLocation);
if (listener != null) {
- listener.onCompleted();
+ listener.onCompleted(localJar, uploadLocation, totalSize);
}
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
@@ -277,13 +270,19 @@ public class StormSubmitter {
/**
* called whenever a chunk of bytes is uploaded
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
* @param bytesUploaded - number of bytes transferred so far
+ * @param totalBytes - total number of bytes of the file
*/
- public void onProgress(long bytesUploaded);
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
/**
* called when the file is uploaded
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param totalBytes - total number of bytes of the file
*/
- public void onCompleted();
+ public void onCompleted(String srcFile, String targetFile, long totalBytes);
}
}
[7/8] git commit: Merge branch 'master' of
https://github.com/lulo/incubator-storm into STORM-315
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/lulo/incubator-storm into STORM-315
STORM-315: Added progress bar when submitting topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/35b288ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/35b288ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/35b288ec
Branch: refs/heads/master
Commit: 35b288ec71d26cfd1ef06d86724f83a1ff5146bd
Parents: b612a86 d9f3353
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Mon May 12 15:21:10 2014 +0000
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Mon May 12 15:21:10 2014 +0000
----------------------------------------------------------------------
.../jvm/storm/starter/BasicDRPCTopology.java | 2 +-
.../jvm/storm/starter/ExclamationTopology.java | 2 +-
.../src/jvm/storm/starter/ReachTopology.java | 2 +-
.../jvm/storm/starter/WordCountTopology.java | 2 +-
.../storm/starter/trident/TridentWordCount.java | 2 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 152 +++++++++++++++++--
6 files changed, 142 insertions(+), 20 deletions(-)
----------------------------------------------------------------------