You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/12 17:35:44 UTC

[2/8] git commit: moved submitTopology with progress bar (StormSubmitterWithProgressBar) to StormSubmitter

moved submitTopology with progress bar (StormSubmitterWithProgressBar) to StormSubmitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2d2e842e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2d2e842e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2d2e842e

Branch: refs/heads/master
Commit: 2d2e842eeda51bcbb59a4dc1c1d46c746cbeb0a9
Parents: 262f2dc
Author: darthbear <fr...@gmail.com>
Authored: Mon May 5 21:06:39 2014 -0400
Committer: darthbear <fr...@gmail.com>
Committed: Mon May 5 21:06:39 2014 -0400

----------------------------------------------------------------------
 .../jvm/storm/starter/BasicDRPCTopology.java    |  3 +-
 .../jvm/storm/starter/ExclamationTopology.java  |  3 +-
 .../src/jvm/storm/starter/ReachTopology.java    |  3 +-
 .../jvm/storm/starter/WordCountTopology.java    |  3 +-
 .../storm/starter/trident/TridentWordCount.java |  3 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  | 62 ++++++++++++++++
 .../storm/StormSubmitterWithProgressBar.java    | 76 --------------------
 7 files changed, 67 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
index af7d238..958860b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
 import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
 import backtype.storm.drpc.LinearDRPCTopologyBuilder;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -73,7 +72,7 @@ public class BasicDRPCTopology {
     }
     else {
       conf.setNumWorkers(3);
-        StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
+        StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
index e1c7622..d7b1b3e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
@@ -20,7 +20,6 @@ package storm.starter;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.testing.TestWordSpout;
@@ -74,7 +73,7 @@ public class ExclamationTopology {
     if (args != null && args.length > 0) {
       conf.setNumWorkers(3);
 
-      StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
     }
     else {
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
index 95d2111..2c5c8ba 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
 import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
 import backtype.storm.coordination.BatchOutputCollector;
 import backtype.storm.drpc.LinearDRPCTopologyBuilder;
 import backtype.storm.task.TopologyContext;
@@ -191,7 +190,7 @@ public class ReachTopology {
     }
     else {
       conf.setNumWorkers(6);
-      StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createRemoteTopology());
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index 553d535..39184da 100644
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@ -20,7 +20,6 @@ package storm.starter;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
 import backtype.storm.task.ShellBolt;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.IRichBolt;
@@ -92,7 +91,7 @@ public class WordCountTopology {
     if (args != null && args.length > 0) {
       conf.setNumWorkers(3);
 
-      StormSubmitterWithProgressBar.submitTopology(args[0], conf, builder.createTopology());
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
     }
     else {
       conf.setMaxTaskParallelism(3);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
index ce8a969..e4a2d2e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
 import backtype.storm.StormSubmitter;
-import backtype.storm.StormSubmitterWithProgressBar;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
@@ -80,7 +79,7 @@ public class TridentWordCount {
     }
     else {
       conf.setNumWorkers(3);
-      StormSubmitterWithProgressBar.submitTopology(args[0], conf, buildTopology(null));
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 90bdca5..ff299ff 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
@@ -125,6 +126,67 @@ public class StormSubmitter {
         }
     }
 
+    /**
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+        submitTopologyWithProgressBar(name, stormConf, topology, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+        // show a progress bar so we know we're not stuck (especially on slow connections)
+        submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+            private long totalBytes;
+            private String srcFile;
+            private String targetFile;
+
+            @Override
+            public void onStart(String srcFile, String targetFile, long totalBytes) {
+                this.srcFile = srcFile;
+                this.targetFile = targetFile;
+                this.totalBytes = totalBytes;
+                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+            }
+
+            @Override
+            public void onProgress(long bytesUploaded) {
+                int length = 50;
+                int p = (int)((length * bytesUploaded) / totalBytes);
+                String progress = StringUtils.repeat("=", p);
+                String todo = StringUtils.repeat(" ", length - p);
+
+                System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+            }
+
+            @Override
+            public void onCompleted() {
+                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+            }
+        });
+    }
+
     private static boolean topologyNameExists(Map conf, String name) {
         NimbusClient client = NimbusClient.getConfiguredClient(conf);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2d2e842e/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java b/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
deleted file mode 100644
index 9d58971..0000000
--- a/storm-core/src/jvm/backtype/storm/StormSubmitterWithProgressBar.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package backtype.storm;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Map;
-
-/**
- * Submit topology with a progress bar
- */
-
-public class StormSubmitterWithProgressBar {
-    /**
-     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-
-    public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
-        submitTopology(name, stormConf, topology, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-
-    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
-        // show a progress bar so we know we're not stuck (especially on slow connections)
-        StormSubmitter.submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
-            private long totalBytes;
-            private String srcFile;
-            private String targetFile;
-
-            @Override
-            public void onStart(String srcFile, String targetFile, long totalBytes) {
-                this.srcFile = srcFile;
-                this.targetFile = targetFile;
-                this.totalBytes = totalBytes;
-                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
-            }
-
-            @Override
-            public void onProgress(long bytesUploaded) {
-                int length = 50;
-                int p = (int)((length * bytesUploaded) / totalBytes);
-                String progress = StringUtils.repeat("=", p);
-                String todo = StringUtils.repeat(" ", length - p);
-
-                System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
-            }
-
-            @Override
-            public void onCompleted() {
-                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
-            }
-        });
-    }
-}