You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/12 17:35:44 UTC
[2/8] git commit: moved submitTopology with progress bar
(StormSubmitterWithProgressBar) to StormSubmitter
moved submitTopology with progress bar (StormSubmitterWithProgressBar) to StormSubmitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2d2e842e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2d2e842e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2d2e842e
Branch: refs/heads/master
Commit: 2d2e842eeda51bcbb59a4dc1c1d46c746cbeb0a9
Parents: 262f2dc
Author: darthbear <fr...@gmail.com>
Authored: Mon May 5 21:06:39 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Mon May 5 21:06:39 2014 -0400
----------------------------------------------------------------------
.../jvm/storm/starter/BasicDRPCTopology.java | 3 +-
.../jvm/storm/starter/ExclamationTopology.java | 3 +-
.../src/jvm/storm/starter/ReachTopology.java | 3 +-
.../jvm/storm/starter/WordCountTopology.java | 3 +-
.../storm/starter/trident/TridentWordCount.java | 3 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 62 ++++++++++++++++
.../storm/StormSubmitterWithProgressBar.java | 76 --------------------
7 files changed, 67 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
index af7d238..958860b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -73,7 +72,7 @@ public class BasicDRPCTopology {
}
else {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
index e1c7622..d7b1b3e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
@@ -20,7 +20,6 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
@@ -74,7 +73,7 @@ public class ExclamationTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
index 95d2111..2c5c8ba 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
@@ -191,7 +190,7 @@ public class ReachTopology {
}
else {
conf.setNumWorkers(6);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index 553d535..39184da 100644
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@ -20,7 +20,6 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
@@ -92,7 +91,7 @@ public class WordCountTopology {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
index ce8a969..e4a2d2e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
@@ -80,7 +79,7 @@ public class TridentWordCount {
}
else {
conf.setNumWorkers(3);
- StormSubmitterWithProgressBar.submitTopology(args[0], conf, buildTopology(null));
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 90bdca5..ff299ff 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
@@ -125,6 +126,67 @@ public class StormSubmitter {
}
}
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ submitTopologyWithProgressBar(name, stormConf, topology, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+ // show a progress bar so we know we're not stuck (especially on slow connections)
+ submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+ private long totalBytes;
+ private String srcFile;
+ private String targetFile;
+
+ @Override
+ public void onStart(String srcFile, String targetFile, long totalBytes) {
+ this.srcFile = srcFile;
+ this.targetFile = targetFile;
+ this.totalBytes = totalBytes;
+ System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+
+ @Override
+ public void onProgress(long bytesUploaded) {
+ int length = 50;
+ int p = (int)((length * bytesUploaded) / totalBytes);
+ String progress = StringUtils.repeat("=", p);
+ String todo = StringUtils.repeat(" ", length - p);
+
+ System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+ });
+ }
+
private static boolean topologyNameExists(Map conf, String name) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java b/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
deleted file mode 100644
index 9d58971..0000000
--- a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package backtype.storm;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Map;
-
-/**
- * Submit topology with a progress bar
- */
-
-public class StormSubmitterWithProgressBar {
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
-
- public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, null);
- }
-
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @param opts to manipulate the starting of the topology
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
-
- public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
- // show a progress bar so we know we're not stuck (especially on slow connections)
- StormSubmitter.submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
- private long totalBytes;
- private String srcFile;
- private String targetFile;
-
- @Override
- public void onStart(String srcFile, String targetFile, long totalBytes) {
- this.srcFile = srcFile;
- this.targetFile = targetFile;
- this.totalBytes = totalBytes;
- System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
- }
-
- @Override
- public void onProgress(long bytesUploaded) {
- int length = 50;
- int p = (int)((length * bytesUploaded) / totalBytes);
- String progress = StringUtils.repeat("=", p);
- String todo = StringUtils.repeat(" ", length - p);
-
- System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
- }
-
- @Override
- public void onCompleted() {
- System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
- }
- });
- }
-}