You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:12 UTC
[04/10] storm git commit: STORM-2447: add in storm local to avoid
having server on worker classpath
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index f5278fd..c162ca6 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -18,22 +18,20 @@
package org.apache.storm.redis.trident;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
public class WordCountTridentRedis {
public static StormTopology buildTopology(String redisHost, Integer redisPort){
@@ -71,28 +69,17 @@ public class WordCountTridentRedis {
}
public static void main(String[] args) throws Exception {
- if (args.length != 3) {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ if (args.length != 2) {
+ System.out.println("Usage: WordCountTrident redis-host redis-port");
System.exit(1);
}
- Integer flag = Integer.valueOf(args[0]);
- String redisHost = args[1];
- Integer redisPort = Integer.valueOf(args[2]);
+ String redisHost = args[0];
+ Integer redisPort = Integer.valueOf(args[1]);
Config conf = new Config();
conf.setMaxSpoutPending(5);
- if (flag == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));) {
- Thread.sleep(60 * 1000);
- }
- System.exit(0);
- } else if(flag == 1) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
- } else {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index b6e067d..687ac54 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -17,27 +17,25 @@
*/
package org.apache.storm.redis.trident;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisClusterState;
import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FixedBatchSpout;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
public class WordCountTridentRedisCluster {
public static StormTopology buildTopology(String redisHostPort){
@@ -79,28 +77,17 @@ public class WordCountTridentRedisCluster {
}
public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+ if (args.length != 1) {
+ System.out.println("Usage: WordCountTrident 127.0.0.1:6379,127.0.0.1:6380");
System.exit(1);
}
- Integer flag = Integer.valueOf(args[0]);
- String redisHostPort = args[1];
+ String redisHostPort = args[0];
Config conf = new Config();
conf.setMaxSpoutPending(5);
- if (flag == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));) {
- Thread.sleep(60 * 1000);
- }
- System.exit(0);
- } else if(flag == 1) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
- } else {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index 0a025bd..8cf0f3c 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -17,17 +17,16 @@
*/
package org.apache.storm.redis.trident;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisClusterMapState;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
@@ -35,10 +34,8 @@ import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.testing.FixedBatchSpout;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
public class WordCountTridentRedisClusterMap {
public static StormTopology buildTopology(String redisHostPort){
@@ -74,28 +71,17 @@ public class WordCountTridentRedisClusterMap {
}
public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+ if (args.length != 1) {
+ System.out.println("Usage: WordCountTrident 127.0.0.1:6379,127.0.0.1:6380");
System.exit(1);
}
- Integer flag = Integer.valueOf(args[0]);
- String redisHostPort = args[1];
+ String redisHostPort = args[0];
Config conf = new Config();
conf.setMaxSpoutPending(5);
- if (flag == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));) {
- Thread.sleep(60 * 1000);
- }
- System.exit(0);
- } else if(flag == 1) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
- } else {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index b0cddcd..dac0a4d 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -18,15 +18,11 @@
package org.apache.storm.redis.trident;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.trident.state.RedisMapState;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
@@ -34,6 +30,8 @@ import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
public class WordCountTridentRedisMap {
public static StormTopology buildTopology(String redisHost, Integer redisPort){
@@ -66,29 +64,18 @@ public class WordCountTridentRedisMap {
}
public static void main(String[] args) throws Exception {
- if (args.length != 3) {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ if (args.length != 2) {
+ System.out.println("Usage: WordCountTrident redis-host redis-port");
System.exit(1);
}
- Integer flag = Integer.valueOf(args[0]);
- String redisHost = args[1];
- Integer redisPort = Integer.valueOf(args[2]);
+ String redisHost = args[0];
+ Integer redisPort = Integer.valueOf(args[1]);
Config conf = new Config();
conf.setMaxSpoutPending(5);
- if (flag == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));) {
- Thread.sleep(60 * 1000);
- }
- System.exit(0);
- } else if(flag == 1) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
- } else {
- System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-solr-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-solr-examples/pom.xml b/examples/storm-solr-examples/pom.xml
index d6d86fc..4ec049b 100644
--- a/examples/storm-solr-examples/pom.xml
+++ b/examples/storm-solr-examples/pom.xml
@@ -30,7 +30,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
index 3b5b1b6..6e483d7 100644
--- a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
+++ b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrTopology.java
@@ -18,18 +18,16 @@
package org.apache.storm.solr.topology;
+import java.io.IOException;
+
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.solr.config.SolrCommitStrategy;
import org.apache.storm.solr.config.SolrConfig;
-import java.io.IOException;
-
public abstract class SolrTopology {
protected static String COLLECTION = "gettingstarted";
@@ -37,11 +35,11 @@ public abstract class SolrTopology {
final StormTopology topology = getTopology();
final Config config = getConfig();
- if (args.length == 0) {
- submitTopologyLocalCluster(topology, config);
- } else {
- submitTopologyRemoteCluster(args[0], topology, config);
+ String topoName = "test";
+ if (args.length > 0) {
+ topoName = args[0];
}
+ submitTopologyRemoteCluster(topoName, topology, config);
}
protected abstract StormTopology getTopology() throws IOException;
@@ -50,15 +48,6 @@ public abstract class SolrTopology {
StormSubmitter.submitTopology(arg, config, topology);
}
- protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", config, topology);) {
- Thread.sleep(10000);
- System.out.println("Killing topology per client's request");
- }
- System.exit(0);
- }
-
protected Config getConfig() {
Config config = new Config();
config.setDebug(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 7d102e7..b66e4fe 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -71,32 +71,27 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-clojure</artifactId>
<version>${project.version}</version>
- <!--
- Use "provided" scope to keep storm out of the jar-with-dependencies
- For IntelliJ dev, intellij will load properly.
- -->
- <scope>${provided.scope}</scope>
</dependency>
- <!--
- normally including 'storm-server' is OK for LocalCluster,
- but 'storm-starter' also uses clojure implementation
- so 'storm-starter' needs to include 'storm-core'
- -->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-clojure-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
- <!--
- Use "provided" scope to keep storm out of the jar-with-dependencies
- For IntelliJ dev, intellij will load properly.
- -->
+ <!--
+ Use "provided" scope to keep storm out of the jar-with-dependencies
+ For IntelliJ dev, intellij will load properly.
+ -->
<scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client-misc</artifactId>
<version>${project.version}</version>
- <scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
index 6bcd35c..415f43f 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
@@ -14,7 +14,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.starter.clj.exclamation
- (:import [org.apache.storm StormSubmitter LocalCluster]
+ (:import [org.apache.storm StormSubmitter]
[org.apache.storm.utils Utils]
[org.apache.storm.testing TestWordSpout])
(:use [org.apache.storm clojure config])
@@ -31,11 +31,6 @@
{"exclaim1" (bolt-spec {"word" :shuffle} exclamation-bolt :p 3)
"exclaim2" (bolt-spec {"exclaim1" :shuffle} exclamation-bolt :p 2)}))
-(defn run-local! []
- (with-open [cluster (LocalCluster.)
- topo (.submitTopology cluster "exclamation" {TOPOLOGY-DEBUG true} (mk-topology))]
- (Utils/sleep 10000)))
-
(defn submit-topology! [name]
(StormSubmitter/submitTopologyWithProgressBar
name
@@ -45,6 +40,6 @@
(defn -main
([]
- (run-local!))
+ (submit-topology! "test"))
([name]
(submit-topology! name)))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
index d5eada5..9a9677c 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
@@ -17,7 +17,7 @@
(:require [org.apache.storm [clojure :refer :all] [config :refer :all]]
[org.apache.storm.starter.clj.bolts :refer
[rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]])
- (:import [org.apache.storm StormSubmitter LocalCluster]
+ (:import [org.apache.storm StormSubmitter]
[org.apache.storm.utils Utils]
[org.apache.storm.testing TestWordSpout])
(:gen-class))
@@ -38,11 +38,6 @@
total-ranker-id (bolt-spec {ranker-id :global}
(total-rankings-bolt 5 2))})))
-(defn run-local! []
- (with-open [cluster (LocalCluster.)
- topo (.submitTopology cluster "slidingWindowCounts" {TOPOLOGY-DEBUG true} (mk-topology))]
- (Utils/sleep 60000)))
-
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
@@ -52,6 +47,6 @@
(defn -main
([]
- (run-local!))
+ (submit-topology! "test"))
([name]
(submit-topology! name)))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
index cefa1e5..e3a52f5 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj
@@ -14,7 +14,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.starter.clj.word-count
- (:import [org.apache.storm StormSubmitter LocalCluster]
+ (:import [org.apache.storm StormSubmitter]
[org.apache.storm.utils Utils])
(:use [org.apache.storm clojure config])
(:gen-class))
@@ -74,11 +74,6 @@
word-count
:p 6)}))
-(defn run-local! []
- (with-open [cluster (LocalCluster.)
- topo (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))]
- (Thread/sleep 10000)))
-
(defn submit-topology! [name]
(StormSubmitter/submitTopology
name
@@ -88,7 +83,7 @@
(defn -main
([]
- (run-local!))
+ (submit-topology! "test"))
([name]
(submit-topology! name)))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
index 0f5f115..28d01f3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
@@ -18,8 +18,6 @@
package org.apache.storm.starter;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.topology.BasicOutputCollector;
@@ -28,6 +26,7 @@ import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
/**
* This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a
@@ -36,42 +35,40 @@ import org.apache.storm.tuple.Values;
* @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
*/
public class BasicDRPCTopology {
- public static class ExclaimBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String input = tuple.getString(1);
- collector.emit(new Values(tuple.getValue(0), input + "!"));
- }
+ public static class ExclaimBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String input = tuple.getString(1);
+ collector.emit(new Values(tuple.getValue(0), input + "!"));
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "result"));
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "result"));
+ }
}
- }
-
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
- builder.addBolt(new ExclaimBolt(), 3);
+ public static void main(String[] args) throws Exception {
+ LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
+ builder.addBolt(new ExclaimBolt(), 3);
- Config conf = new Config();
+ Config conf = new Config();
+ String topoName = "DRPCExample";
- if (args == null || args.length == 0) {
- try (LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster()) {
-
- cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
-
- for (String word : new String[]{ "hello", "goodbye" }) {
- System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
+ if (args != null && args.length > 0) {
+ topoName = args[0];
}
- Thread.sleep(10000);
- }
- }
- else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+
+ if (args.length > 1) {
+ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+ for (int i = 1; i < args.length; i++) {
+ String word = args[i];
+ System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
+ }
+ }
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 0b04709..ed93686 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -71,11 +71,7 @@ public class ExclamationTopology extends ConfigurableTopology {
String topologyName = "test";
- if (isLocal) {
- ttl = 10;
- } else {
- conf.setNumWorkers(3);
- }
+ conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
index 5acf908..1c48480 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
@@ -117,7 +117,7 @@ public class FastWordCountTopology {
}
}
- public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+ public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
for (TopologySummary ts: summary.get_topologies()) {
@@ -157,7 +157,7 @@ public class FastWordCountTopology {
System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
}
- public static void kill(Nimbus.Client client, String name) throws Exception {
+ public static void kill(Nimbus.Iface client, String name) throws Exception {
KillOptions opts = new KillOptions();
opts.set_wait_secs(0);
client.killTopologyWithOpts(name, opts);
@@ -185,7 +185,7 @@ public class FastWordCountTopology {
Map clusterConf = Utils.readStormConfig();
clusterConf.putAll(Utils.readCommandLineOpts());
- Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
//Sleep for 5 mins
for (int i = 0; i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
index 78907ea..63be726 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java
@@ -95,7 +95,7 @@ public class InOrderDeliveryTest {
}
}
- public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+ public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
for (TopologySummary ts: summary.get_topologies()) {
@@ -135,7 +135,7 @@ public class InOrderDeliveryTest {
System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
}
- public static void kill(Nimbus.Client client, String name) throws Exception {
+ public static void kill(Nimbus.Iface client, String name) throws Exception {
KillOptions opts = new KillOptions();
opts.set_wait_secs(0);
client.killTopologyWithOpts(name, opts);
@@ -161,7 +161,7 @@ public class InOrderDeliveryTest {
Map clusterConf = Utils.readStormConfig();
clusterConf.putAll(Utils.readCommandLineOpts());
- Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
//Sleep for 50 mins
for (int i = 0; i < 50; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
index c23fad5..b71b64a 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java
@@ -17,7 +17,10 @@
*/
package org.apache.storm.starter;
+import java.util.concurrent.TimeUnit;
+
import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.bolt.JoinBolt;
import org.apache.storm.starter.bolt.PrinterBolt;
import org.apache.storm.testing.FeederSpout;
@@ -25,14 +28,14 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.LocalCluster;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.NimbusClient;
public class JoinBoltExample {
public static void main(String[] args) throws Exception {
-
+ if (!NimbusClient.isLocalOverride()) {
+ throw new IllegalStateException("This example only works in local mode. "
+ + "Run with storm local not storm jar");
+ }
FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
@@ -53,16 +56,11 @@ public class JoinBoltExample {
builder.setBolt("printer", new PrinterBolt() ).shuffleGrouping("joiner");
Config conf = new Config();
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("join-example", conf, builder.createTopology());
+ StormSubmitter.submitTopologyWithProgressBar("join-example", conf, builder.createTopology());
generateGenderData(genderSpout);
generateAgeData(ageSpout);
-
- Utils.sleep(30000);
- cluster.shutdown();
}
private static void generateAgeData(FeederSpout ageSpout) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
index 5375ea6..110d0be 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
@@ -18,9 +18,7 @@
package org.apache.storm.starter;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.DRPCSpout;
import org.apache.storm.drpc.ReturnResults;
import org.apache.storm.topology.BasicOutputCollector;
@@ -30,6 +28,7 @@ import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DRPCClient;
public class ManualDRPC {
public static class ExclamationBolt extends BaseBasicBolt {
@@ -50,19 +49,17 @@ public class ManualDRPC {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
- try(LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();) {
- DRPCSpout spout = new DRPCSpout("exclamation", drpc);
- builder.setSpout("drpc", spout);
- builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
- builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
+ DRPCSpout spout = new DRPCSpout("exclamation");
+ builder.setSpout("drpc", spout);
+ builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
+ builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
- Config conf = new Config();
- try (LocalTopology topo = cluster.submitTopology("exclaim", conf, builder.createTopology())) {
- System.out.println(drpc.execute("exclamation", "aaa"));
- System.out.println(drpc.execute("exclamation", "bbb"));
- }
+ Config conf = new Config();
+ StormSubmitter.submitTopology("exclaim", conf, builder.createTopology());
+ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+ System.out.println(drpc.execute("exclamation", "aaa"));
+ System.out.println(drpc.execute("exclamation", "bbb"));
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
index aca5c7b..26c4ee8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java
@@ -17,9 +17,9 @@
*/
package org.apache.storm.starter;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -30,13 +30,9 @@ import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* This is a basic example of a Storm topology.
*/
@@ -90,15 +86,11 @@ public class MultipleLoggerTopology {
Config conf = new Config();
conf.setDebug(true);
-
+ String topoName = MultipleLoggerTopology.class.getName();
if (args != null && args.length > 0) {
- conf.setNumWorkers(2);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(10000);
- }
+ topoName = args[0];
}
+ conf.setNumWorkers(2);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
index 6b4aaa6..e90fcb0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
@@ -17,9 +17,14 @@
*/
package org.apache.storm.starter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
@@ -31,8 +36,7 @@ import org.apache.storm.topology.base.BaseBatchBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-
-import java.util.*;
+import org.apache.storm.utils.DRPCClient;
/**
* This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
@@ -51,144 +55,139 @@ import java.util.*;
* @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
*/
public class ReachTopology {
- public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
- put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
- put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
- put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
- }};
-
- public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
- put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
- put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
- put("tim", Arrays.asList("alex"));
- put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
- put("adam", Arrays.asList("david", "carissa"));
- put("mike", Arrays.asList("john", "bob"));
- put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
- }};
-
- public static class GetTweeters extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Object id = tuple.getValue(0);
- String url = tuple.getString(1);
- List<String> tweeters = TWEETERS_DB.get(url);
- if (tweeters != null) {
- for (String tweeter : tweeters) {
- collector.emit(new Values(id, tweeter));
+ public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+ put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+ put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+ }};
+
+ public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+ put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+ put("tim", Arrays.asList("alex"));
+ put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+ put("adam", Arrays.asList("david", "carissa"));
+ put("mike", Arrays.asList("john", "bob"));
+ put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+ }};
+
+ public static class GetTweeters extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ Object id = tuple.getValue(0);
+ String url = tuple.getString(1);
+ List<String> tweeters = TWEETERS_DB.get(url);
+ if (tweeters != null) {
+ for (String tweeter : tweeters) {
+ collector.emit(new Values(id, tweeter));
+ }
+ }
}
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "tweeter"));
- }
- }
-
- public static class GetFollowers extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Object id = tuple.getValue(0);
- String tweeter = tuple.getString(1);
- List<String> followers = FOLLOWERS_DB.get(tweeter);
- if (followers != null) {
- for (String follower : followers) {
- collector.emit(new Values(id, follower));
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "tweeter"));
}
- }
}
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "follower"));
- }
- }
-
- public static class PartialUniquer extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- Set<String> _followers = new HashSet<String>();
+ public static class GetFollowers extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ Object id = tuple.getValue(0);
+ String tweeter = tuple.getString(1);
+ List<String> followers = FOLLOWERS_DB.get(tweeter);
+ if (followers != null) {
+ for (String follower : followers) {
+ collector.emit(new Values(id, follower));
+ }
+ }
+ }
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "follower"));
+ }
}
- @Override
- public void execute(Tuple tuple) {
- _followers.add(tuple.getString(1));
- }
+ public static class PartialUniquer extends BaseBatchBolt {
+ BatchOutputCollector _collector;
+ Object _id;
+ Set<String> _followers = new HashSet<String>();
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _followers.size()));
- }
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "partial-count"));
- }
- }
+ @Override
+ public void execute(Tuple tuple) {
+ _followers.add(tuple.getString(1));
+ }
- public static class CountAggregator extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- int _count = 0;
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _followers.size()));
+ }
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "partial-count"));
+ }
}
- @Override
- public void execute(Tuple tuple) {
- _count += tuple.getInteger(1);
- }
+ public static class CountAggregator extends BaseBatchBolt {
+ BatchOutputCollector _collector;
+ Object _id;
+ int _count = 0;
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "reach"));
- }
- }
+ @Override
+ public void execute(Tuple tuple) {
+ _count += tuple.getInteger(1);
+ }
- public static LinearDRPCTopologyBuilder construct() {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
- builder.addBolt(new GetTweeters(), 4);
- builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
- builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
- builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
- return builder;
- }
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _count));
+ }
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = construct();
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "reach"));
+ }
+ }
+ public static LinearDRPCTopologyBuilder construct() {
+ LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
+ builder.addBolt(new GetTweeters(), 4);
+ builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+ builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
+ builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
+ return builder;
+ }
- Config conf = new Config();
+ public static void main(String[] args) throws Exception {
+ LinearDRPCTopologyBuilder builder = construct();
- if (args == null || args.length == 0) {
- conf.setMaxTaskParallelism(3);
- try (LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();) {
- cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
+ Config conf = new Config();
+ conf.setNumWorkers(6);
+ String topoName = "reach-drpc";
+ if (args.length > 0) {
+ topoName = args[0];
+ }
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createRemoteTopology());
- String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
- for (String url : urlsToTry) {
- System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+ String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
+ for (String url : urlsToTry) {
+ System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+ }
}
- }
- }
- else {
- conf.setNumWorkers(6);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
index 788485d..57fcd5d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -17,9 +17,9 @@
*/
package org.apache.storm.starter;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -32,9 +32,6 @@ import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
public class ResourceAwareExampleTopology {
public static class ExclamationBolt extends BaseRichBolt {
@@ -90,17 +87,12 @@ public class ResourceAwareExampleTopology {
// Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
+ String topoName = "test";
if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ topoName = args[0];
}
- else {
+ conf.setNumWorkers(3);
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(10000);
- }
- }
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
index 7ebbaf8..78b2baf 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -57,15 +57,6 @@ public class RollingTopWords extends ConfigurableTopology {
*
* ```
*
- * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
- * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords -local
- *
- * # Runs in local mode (LocalCluster), with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar -local
- *
- * # Runs in local mode (LocalCluster) for 30 seconds, with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar -local -ttl 30
- *
* # Runs in remote/cluster mode, with topology name "production-topology"
* $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology ```
*
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
index 4eb9ed9..2e1bb94 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
@@ -18,52 +18,50 @@
package org.apache.storm.starter;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.bolt.SingleJoinBolt;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.starter.bolt.SingleJoinBolt;
+import org.apache.storm.utils.NimbusClient;
/** Example of using a simple custom join bolt
* NOTE: Prefer to use the built-in JoinBolt wherever applicable
*/
public class SingleJoinExample {
- public static void main(String[] args) throws Exception {
- FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
- FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+ public static void main(String[] args) throws Exception {
+ if (!NimbusClient.isLocalOverride()) {
+ throw new IllegalStateException("This example only works in local mode. "
+ + "Run with storm local not storm jar");
+ }
+ FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+ FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("gender", genderSpout);
- builder.setSpout("age", ageSpout);
- builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("gender", genderSpout);
+ builder.setSpout("age", ageSpout);
+ builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
.fieldsGrouping("age", new Fields("id"));
- Config conf = new Config();
- conf.setDebug(true);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("join-example", conf, builder.createTopology());) {
+ Config conf = new Config();
+ conf.setDebug(true);
+ StormSubmitter.submitTopology("join-example", conf, builder.createTopology());
- for (int i = 0; i < 10; i++) {
- String gender;
- if (i % 2 == 0) {
- gender = "male";
+ for (int i = 0; i < 10; i++) {
+ String gender;
+ if (i % 2 == 0) {
+ gender = "male";
+ }
+ else {
+ gender = "female";
+ }
+ genderSpout.feed(new Values(i, gender));
}
- else {
- gender = "female";
- }
- genderSpout.feed(new Values(i, gender));
- }
-
- for (int i = 9; i >= 0; i--) {
- ageSpout.feed(new Values(i, i + 20));
- }
- Utils.sleep(2000);
+ for (int i = 9; i >= 0; i--) {
+ ageSpout.feed(new Values(i, i + 20));
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
index 83ad4fc..4e18217 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -60,15 +60,6 @@ public class SkewedRollingTopWords extends ConfigurableTopology {
*
* ```
*
- * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
- * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords -local
- *
- * # Runs in local mode (LocalCluster), with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords foobar -local
- *
- * # Runs in local mode (LocalCluster) for 30 seconds, with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords foobar -local -ttl 30
- *
* # Runs in remote/cluster mode, with topology name "production-topology"
* $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords production-topology ```
*
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
index 3993b95..6204f8c 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
@@ -17,20 +17,16 @@
*/
package org.apache.storm.starter;
+import java.util.concurrent.TimeUnit;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.utils.Utils;
import org.apache.storm.starter.bolt.PrinterBolt;
import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
import org.apache.storm.starter.spout.RandomIntegerSpout;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
/**
* Windowing based on tuple timestamp (e.g. the time when tuple is generated
@@ -48,15 +44,13 @@ public class SlidingTupleTsTopology {
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
Config conf = new Config();
conf.setDebug(true);
-
+ String topoName = "test";
+
if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(40000);
- }
+ topoName = args[0];
}
+
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
index 3c68f16..f00dac9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
@@ -17,30 +17,26 @@
*/
package org.apache.storm.starter;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.storm.starter.bolt.PrinterBolt;
-import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
-import org.apache.storm.starter.spout.RandomIntegerSpout;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
/**
* A sample topology that demonstrates the usage of {@link org.apache.storm.topology.IWindowedBolt}
@@ -95,14 +91,11 @@ public class SlidingWindowTopology {
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
Config conf = new Config();
conf.setDebug(true);
+ String topoName = "test";
if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(40000);
- }
+ topoName = args[0];
}
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
index cd8fa2c..36495fd 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java
@@ -17,9 +17,9 @@
*/
package org.apache.storm.starter;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.state.KeyValueState;
@@ -33,12 +33,9 @@ import org.apache.storm.topology.base.BaseStatefulBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* An example topology that demonstrates the use of {@link org.apache.storm.topology.IStatefulBolt}
* to manage state. To run the example,
@@ -129,15 +126,11 @@ public class StatefulTopology {
builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
Config conf = new Config();
conf.setDebug(false);
-
+ String topoName = "test";
if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topology = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(40000);
- }
+ topoName = args[0];
}
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
index 0e9b6ea..8cce057 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java
@@ -17,9 +17,9 @@
*/
package org.apache.storm.starter;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.bolt.PrinterBolt;
import org.apache.storm.starter.spout.RandomIntegerSpout;
@@ -30,18 +30,14 @@ import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
-
/**
* A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to
* save the state of the windowing operation to avoid re-computation in case of failures.
@@ -97,15 +93,13 @@ public class StatefulWindowingTopology {
Config conf = new Config();
conf.setDebug(false);
//conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+ String topoName = "test";
if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());) {
- Utils.sleep(40000);
- }
+ topoName = args[0];
}
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}