You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/03/20 22:22:46 UTC
[23/50] [abbrv] git commit: Reformat Java code to use 2 instead of 4
spaces (to match Clojure style)
Reformat Java code to use 2 instead of 4 spaces (to match Clojure style)
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a51c8247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a51c8247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a51c8247
Branch: refs/heads/master
Commit: a51c8247e3f358788efc8fb3ed7104b7b7cb21c4
Parents: 3b2db7c
Author: Michael G. Noll <mn...@verisign.com>
Authored: Thu Aug 29 12:39:14 2013 +0200
Committer: Michael G. Noll <mn...@verisign.com>
Committed: Thu Aug 29 12:39:14 2013 +0200
----------------------------------------------------------------------
src/jvm/storm/starter/BasicDRPCTopology.java | 84 ++--
src/jvm/storm/starter/ExclamationTopology.java | 82 ++--
src/jvm/storm/starter/ManualDRPC.java | 64 ++-
src/jvm/storm/starter/ReachTopology.java | 297 ++++++------
src/jvm/storm/starter/RollingTopWords.java | 92 ++--
src/jvm/storm/starter/SingleJoinExample.java | 66 +--
.../storm/starter/TransactionalGlobalCount.java | 237 +++++-----
src/jvm/storm/starter/TransactionalWords.java | 369 +++++++--------
src/jvm/storm/starter/WordCountTopology.java | 132 +++---
.../storm/starter/bolt/AbstractRankerBolt.java | 126 +++---
.../starter/bolt/IntermediateRankingsBolt.java | 46 +-
src/jvm/storm/starter/bolt/PrinterBolt.java | 16 +-
.../storm/starter/bolt/RollingCountBolt.java | 183 ++++----
src/jvm/storm/starter/bolt/SingleJoinBolt.java | 153 ++++---
.../storm/starter/bolt/TotalRankingsBolt.java | 46 +-
.../starter/spout/RandomSentenceSpout.java | 69 ++-
.../tools/NthLastModifiedTimeTracker.java | 64 ++-
src/jvm/storm/starter/tools/Rankable.java | 4 +-
.../starter/tools/RankableObjectWithFields.java | 189 ++++----
src/jvm/storm/starter/tools/Rankings.java | 142 +++---
.../starter/tools/SlidingWindowCounter.java | 111 +++--
.../storm/starter/tools/SlotBasedCounter.java | 138 +++---
src/jvm/storm/starter/trident/TridentReach.java | 199 ++++----
.../storm/starter/trident/TridentWordCount.java | 98 ++--
src/jvm/storm/starter/util/StormRunner.java | 22 +-
src/jvm/storm/starter/util/TupleHelpers.java | 12 +-
.../bolt/IntermediateRankingsBoltTest.java | 238 +++++-----
.../starter/bolt/RollingCountBoltTest.java | 171 ++++---
.../starter/bolt/TotalRankingsBoltTest.java | 241 +++++-----
.../storm/starter/tools/MockTupleHelpers.java | 26 +-
.../tools/NthLastModifiedTimeTrackerTest.java | 194 ++++----
.../tools/RankableObjectWithFieldsTest.java | 419 +++++++++--------
test/jvm/storm/starter/tools/RankingsTest.java | 453 +++++++++----------
.../starter/tools/SlidingWindowCounterTest.java | 160 ++++---
.../starter/tools/SlotBasedCounterTest.java | 304 ++++++-------
35 files changed, 2574 insertions(+), 2673 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/BasicDRPCTopology.java b/src/jvm/storm/starter/BasicDRPCTopology.java
index 1ea8e77..7e7ef93 100644
--- a/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -5,59 +5,57 @@ import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import java.util.Map;
/**
- * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function
- * that appends a "!" to any string you send the DRPC function.
- *
- * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on
- * doing distributed RPC on top of Storm.
+ * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a
+ * "!" to any string you send the DRPC function.
+ * <p/>
+ * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of
+ * Storm.
*/
public class BasicDRPCTopology {
- public static class ExclaimBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String input = tuple.getString(1);
- collector.emit(new Values(tuple.getValue(0), input + "!"));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "result"));
- }
-
+ public static class ExclaimBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String input = tuple.getString(1);
+ collector.emit(new Values(tuple.getValue(0), input + "!"));
}
-
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
- builder.addBolt(new ExclaimBolt(), 3);
-
- Config conf = new Config();
-
- if(args==null || args.length==0) {
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
-
- for(String word: new String[] {"hello", "goodbye"}) {
- System.out.println("Result for \"" + word + "\": "
- + drpc.execute("exclamation", word));
- }
-
- cluster.shutdown();
- drpc.shutdown();
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
- }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "result"));
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
+ builder.addBolt(new ExclaimBolt(), 3);
+
+ Config conf = new Config();
+
+ if (args == null || args.length == 0) {
+ LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
+
+ for (String word : new String[]{ "hello", "goodbye" }) {
+ System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
+ }
+
+ cluster.shutdown();
+ drpc.shutdown();
+ }
+ else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/ExclamationTopology.java b/src/jvm/storm/starter/ExclamationTopology.java
index b98c8cd..fed8b1e 100644
--- a/src/jvm/storm/starter/ExclamationTopology.java
+++ b/src/jvm/storm/starter/ExclamationTopology.java
@@ -13,58 +13,58 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
+
import java.util.Map;
/**
* This is a basic example of a Storm topology.
*/
public class ExclamationTopology {
-
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
+ public static class ExclamationBolt extends BaseRichBolt {
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ _collector.ack(tuple);
+ }
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
+ }
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("word", new TestWordSpout(), 10);
+ builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+ builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("word", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3)
- .shuffleGrouping("word");
- builder.setBolt("exclaim2", new ExclamationBolt(), 2)
- .shuffleGrouping("exclaim1");
-
- Config conf = new Config();
- conf.setDebug(true);
-
- if(args!=null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- } else {
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
+ else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(10000);
+ cluster.killTopology("test");
+ cluster.shutdown();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/ManualDRPC.java b/src/jvm/storm/starter/ManualDRPC.java
index fd45c9c..ade4ab1 100644
--- a/src/jvm/storm/starter/ManualDRPC.java
+++ b/src/jvm/storm/starter/ManualDRPC.java
@@ -15,39 +15,37 @@ import backtype.storm.tuple.Values;
public class ManualDRPC {
- public static class ExclamationBolt extends BaseBasicBolt {
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("result", "return-info"));
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String arg = tuple.getString(0);
- Object retInfo = tuple.getValue(1);
- collector.emit(new Values(arg + "!!!", retInfo));
- }
-
+ public static class ExclamationBolt extends BaseBasicBolt {
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("result", "return-info"));
}
-
- public static void main(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
- LocalDRPC drpc = new LocalDRPC();
-
- DRPCSpout spout = new DRPCSpout("exclamation", drpc);
- builder.setSpout("drpc", spout);
- builder.setBolt("exclaim", new ExclamationBolt(), 3)
- .shuffleGrouping("drpc");
- builder.setBolt("return", new ReturnResults(), 3)
- .shuffleGrouping("exclaim");
-
- LocalCluster cluster = new LocalCluster();
- Config conf = new Config();
- cluster.submitTopology("exclaim", conf, builder.createTopology());
-
- System.out.println(drpc.execute("exclamation", "aaa"));
- System.out.println(drpc.execute("exclamation", "bbb"));
-
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String arg = tuple.getString(0);
+ Object retInfo = tuple.getValue(1);
+ collector.emit(new Values(arg + "!!!", retInfo));
}
+
+ }
+
+ public static void main(String[] args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ LocalDRPC drpc = new LocalDRPC();
+
+ DRPCSpout spout = new DRPCSpout("exclamation", drpc);
+ builder.setSpout("drpc", spout);
+ builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
+ builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
+
+ LocalCluster cluster = new LocalCluster();
+ Config conf = new Config();
+ cluster.submitTopology("exclaim", conf, builder.createTopology());
+
+ System.out.println(drpc.execute("exclamation", "aaa"));
+ System.out.println(drpc.execute("exclamation", "bbb"));
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/ReachTopology.java b/src/jvm/storm/starter/ReachTopology.java
index 8a7fdbc..a63db3e 100644
--- a/src/jvm/storm/starter/ReachTopology.java
+++ b/src/jvm/storm/starter/ReachTopology.java
@@ -5,186 +5,175 @@ import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+
+import java.util.*;
/**
- * This is a good example of doing complex Distributed RPC on top of Storm. This
- * program creates a topology that can compute the reach for any URL on Twitter
- * in realtime by parallelizing the whole computation.
- *
- * Reach is the number of unique people exposed to a URL on Twitter. To compute reach,
- * you have to get all the people who tweeted the URL, get all the followers of all those people,
- * unique that set of followers, and then count the unique set. It's an intense computation
- * that can involve thousands of database calls and tens of millions of follower records.
- *
- * This Storm topology does every piece of that computation in parallel, turning what would be a
- * computation that takes minutes on a single machine into one that takes just a couple seconds.
- *
- * For the purposes of demonstration, this topology replaces the use of actual DBs with
- * in-memory hashmaps.
- *
+ * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
+ * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
+ * <p/>
+ * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
+ * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
+ * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
+ * records.
+ * <p/>
+ * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
+ * minutes on a single machine into one that takes just a couple seconds.
+ * <p/>
+ * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
+ * <p/>
* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC.
*/
public class ReachTopology {
- public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
- put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
- put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
- put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
- }};
-
- public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
- put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
- put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
- put("tim", Arrays.asList("alex"));
- put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
- put("adam", Arrays.asList("david", "carissa"));
- put("mike", Arrays.asList("john", "bob"));
- put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
- }};
-
- public static class GetTweeters extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Object id = tuple.getValue(0);
- String url = tuple.getString(1);
- List<String> tweeters = TWEETERS_DB.get(url);
- if(tweeters!=null) {
- for(String tweeter: tweeters) {
- collector.emit(new Values(id, tweeter));
- }
- }
+ public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+ put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+ put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+ }};
+
+ public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+ put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+ put("tim", Arrays.asList("alex"));
+ put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+ put("adam", Arrays.asList("david", "carissa"));
+ put("mike", Arrays.asList("john", "bob"));
+ put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+ }};
+
+ public static class GetTweeters extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ Object id = tuple.getValue(0);
+ String url = tuple.getString(1);
+ List<String> tweeters = TWEETERS_DB.get(url);
+ if (tweeters != null) {
+ for (String tweeter : tweeters) {
+ collector.emit(new Values(id, tweeter));
}
+ }
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "tweeter"));
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "tweeter"));
}
-
- public static class GetFollowers extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Object id = tuple.getValue(0);
- String tweeter = tuple.getString(1);
- List<String> followers = FOLLOWERS_DB.get(tweeter);
- if(followers!=null) {
- for(String follower: followers) {
- collector.emit(new Values(id, follower));
- }
- }
- }
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "follower"));
+ public static class GetFollowers extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ Object id = tuple.getValue(0);
+ String tweeter = tuple.getString(1);
+ List<String> followers = FOLLOWERS_DB.get(tweeter);
+ if (followers != null) {
+ for (String follower : followers) {
+ collector.emit(new Values(id, follower));
}
+ }
}
-
- public static class PartialUniquer extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- Set<String> _followers = new HashSet<String>();
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
- @Override
- public void execute(Tuple tuple) {
- _followers.add(tuple.getString(1));
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _followers.size()));
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "follower"));
+ }
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "partial-count"));
- }
+ public static class PartialUniquer extends BaseBatchBolt {
+ BatchOutputCollector _collector;
+ Object _id;
+ Set<String> _followers = new HashSet<String>();
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
}
-
- public static class CountAggregator extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
- @Override
- public void execute(Tuple tuple) {
- _count+=tuple.getInteger(1);
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
+ @Override
+ public void execute(Tuple tuple) {
+ _followers.add(tuple.getString(1));
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "reach"));
- }
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _followers.size()));
}
-
- public static LinearDRPCTopologyBuilder construct() {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
- builder.addBolt(new GetTweeters(), 4);
- builder.addBolt(new GetFollowers(), 12)
- .shuffleGrouping();
- builder.addBolt(new PartialUniquer(), 6)
- .fieldsGrouping(new Fields("id", "follower"));
- builder.addBolt(new CountAggregator(), 3)
- .fieldsGrouping(new Fields("id"));
- return builder;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "partial-count"));
}
-
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = construct();
-
-
- Config conf = new Config();
-
- if(args==null || args.length==0) {
- conf.setMaxTaskParallelism(3);
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
-
- String[] urlsToTry = new String[] { "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com"};
- for(String url: urlsToTry) {
- System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
- }
-
- cluster.shutdown();
- drpc.shutdown();
- } else {
- conf.setNumWorkers(6);
- StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
- }
+ }
+
+ public static class CountAggregator extends BaseBatchBolt {
+ BatchOutputCollector _collector;
+ Object _id;
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _count += tuple.getInteger(1);
+ }
+
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "reach"));
+ }
+ }
+
+ public static LinearDRPCTopologyBuilder construct() {
+ LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
+ builder.addBolt(new GetTweeters(), 4);
+ builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+ builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
+ builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
+ return builder;
+ }
+
+ public static void main(String[] args) throws Exception {
+ LinearDRPCTopologyBuilder builder = construct();
+
+
+ Config conf = new Config();
+
+ if (args == null || args.length == 0) {
+ conf.setMaxTaskParallelism(3);
+ LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
+
+ String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
+ for (String url : urlsToTry) {
+ System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+ }
+
+ cluster.shutdown();
+ drpc.shutdown();
+ }
+ else {
+ conf.setNumWorkers(6);
+ StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/RollingTopWords.java b/src/jvm/storm/starter/RollingTopWords.java
index ef016a4..9d0b3c8 100644
--- a/src/jvm/storm/starter/RollingTopWords.java
+++ b/src/jvm/storm/starter/RollingTopWords.java
@@ -1,13 +1,13 @@
package storm.starter;
-import storm.starter.bolt.IntermediateRankingsBolt;
-import storm.starter.bolt.RollingCountBolt;
-import storm.starter.bolt.TotalRankingsBolt;
-import storm.starter.util.StormRunner;
import backtype.storm.Config;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
+import storm.starter.bolt.IntermediateRankingsBolt;
+import storm.starter.bolt.RollingCountBolt;
+import storm.starter.bolt.TotalRankingsBolt;
+import storm.starter.util.StormRunner;
/**
* This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
@@ -16,46 +16,46 @@ import backtype.storm.tuple.Fields;
*/
public class RollingTopWords {
- private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
- private static final int TOP_N = 5;
-
- private final TopologyBuilder builder;
- private final String topologyName;
- private final Config topologyConfig;
- private final int runtimeInSeconds;
-
- public RollingTopWords() throws InterruptedException {
- builder = new TopologyBuilder();
- topologyName = "slidingWindowCounts";
- topologyConfig = createTopologyConfiguration();
- runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
- wireTopology();
- }
-
- private static Config createTopologyConfiguration() {
- Config conf = new Config();
- conf.setDebug(true);
- return conf;
- }
-
- private void wireTopology() throws InterruptedException {
- String spoutId = "wordGenerator";
- String counterId = "counter";
- String intermediateRankerId = "intermediateRanker";
- String totalRankerId = "finalRanker";
- builder.setSpout(spoutId, new TestWordSpout(), 5);
- builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
- builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId,
- new Fields("obj"));
- builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
- }
-
- public void run() throws InterruptedException {
- StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
- }
-
- public static void main(String[] args) throws Exception {
- new RollingTopWords().run();
- }
+ private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+ private static final int TOP_N = 5;
+
+ private final TopologyBuilder builder;
+ private final String topologyName;
+ private final Config topologyConfig;
+ private final int runtimeInSeconds;
+
+ public RollingTopWords() throws InterruptedException {
+ builder = new TopologyBuilder();
+ topologyName = "slidingWindowCounts";
+ topologyConfig = createTopologyConfiguration();
+ runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+ wireTopology();
+ }
+
+ private static Config createTopologyConfiguration() {
+ Config conf = new Config();
+ conf.setDebug(true);
+ return conf;
+ }
+
+ private void wireTopology() throws InterruptedException {
+ String spoutId = "wordGenerator";
+ String counterId = "counter";
+ String intermediateRankerId = "intermediateRanker";
+ String totalRankerId = "finalRanker";
+ builder.setSpout(spoutId, new TestWordSpout(), 5);
+ builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
+ builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
+ "obj"));
+ builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+ }
+
+ public void run() throws InterruptedException {
+ StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+ }
+
+ public static void main(String[] args) throws Exception {
+ new RollingTopWords().run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/SingleJoinExample.java b/src/jvm/storm/starter/SingleJoinExample.java
index 62d332e..d323809 100644
--- a/src/jvm/storm/starter/SingleJoinExample.java
+++ b/src/jvm/storm/starter/SingleJoinExample.java
@@ -10,38 +10,38 @@ import backtype.storm.utils.Utils;
import storm.starter.bolt.SingleJoinBolt;
public class SingleJoinExample {
- public static void main(String[] args) {
- FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
- FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("gender", genderSpout);
- builder.setSpout("age", ageSpout);
- builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
- .fieldsGrouping("gender", new Fields("id"))
- .fieldsGrouping("age", new Fields("id"));
-
- Config conf = new Config();
- conf.setDebug(true);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("join-example", conf, builder.createTopology());
-
- for(int i=0; i<10; i++) {
- String gender;
- if(i % 2 == 0) {
- gender = "male";
- } else {
- gender = "female";
- }
- genderSpout.feed(new Values(i, gender));
- }
-
- for(int i=9; i>=0; i--) {
- ageSpout.feed(new Values(i, i+20));
- }
-
- Utils.sleep(2000);
- cluster.shutdown();
+ public static void main(String[] args) {
+ FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+ FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("gender", genderSpout);
+ builder.setSpout("age", ageSpout);
+ builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
+ .fieldsGrouping("age", new Fields("id"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("join-example", conf, builder.createTopology());
+
+ for (int i = 0; i < 10; i++) {
+ String gender;
+ if (i % 2 == 0) {
+ gender = "male";
+ }
+ else {
+ gender = "female";
+ }
+ genderSpout.feed(new Values(i, gender));
}
+
+ for (int i = 9; i >= 0; i--) {
+ ageSpout.feed(new Values(i, i + 20));
+ }
+
+ Utils.sleep(2000);
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/TransactionalGlobalCount.java b/src/jvm/storm/starter/TransactionalGlobalCount.java
index b4579a4..91b16aa 100644
--- a/src/jvm/storm/starter/TransactionalGlobalCount.java
+++ b/src/jvm/storm/starter/TransactionalGlobalCount.java
@@ -14,6 +14,7 @@ import backtype.storm.transactional.TransactionalTopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
@@ -21,135 +22,135 @@ import java.util.List;
import java.util.Map;
/**
- * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far
- * in a database. The source of data and the databases are mocked out as in memory maps for demonstration
- * purposes. This class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies
+ * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
+ * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This
+ * class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies
*/
public class TransactionalGlobalCount {
- public static final int PARTITION_TAKE_PER_BATCH = 3;
- public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
- put(0, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("chicken"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- }});
- put(1, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- add(new Values("banana"));
- }});
- put(2, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- }});
- }};
-
- public static class Value {
- int count = 0;
- BigInteger txid;
+ public static final int PARTITION_TAKE_PER_BATCH = 3;
+ public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+ put(0, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("chicken"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ }});
+ put(1, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ add(new Values("banana"));
+ }});
+ put(2, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ }});
+ }};
+
+ public static class Value {
+ int count = 0;
+ BigInteger txid;
+ }
+
+ public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+ public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
+
+ public static class BatchCount extends BaseBatchBolt {
+ Object _id;
+ BatchOutputCollector _collector;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
}
- public static Map<String, Value> DATABASE = new HashMap<String, Value>();
- public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-
- public static class BatchCount extends BaseBatchBolt {
- Object _id;
- BatchOutputCollector _collector;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
+ @Override
+ public void execute(Tuple tuple) {
+ _count++;
+ }
- @Override
- public void execute(Tuple tuple) {
- _count++;
- }
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _count));
+ }
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "count"));
+ }
+ }
+
+ public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
+ TransactionAttempt _attempt;
+ BatchOutputCollector _collector;
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "count"));
- }
+ int _sum = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+ _collector = collector;
+ _attempt = attempt;
}
-
- public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
- TransactionAttempt _attempt;
- BatchOutputCollector _collector;
-
- int _sum = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
- _collector = collector;
- _attempt = attempt;
- }
- @Override
- public void execute(Tuple tuple) {
- _sum+=tuple.getInteger(1);
- }
+ @Override
+ public void execute(Tuple tuple) {
+ _sum += tuple.getInteger(1);
+ }
- @Override
- public void finishBatch() {
- Value val = DATABASE.get(GLOBAL_COUNT_KEY);
- Value newval;
- if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
- newval = new Value();
- newval.txid = _attempt.getTransactionId();
- if(val==null) {
- newval.count = _sum;
- } else {
- newval.count = _sum + val.count;
- }
- DATABASE.put(GLOBAL_COUNT_KEY, newval);
- } else {
- newval = val;
- }
- _collector.emit(new Values(_attempt, newval.count));
+ @Override
+ public void finishBatch() {
+ Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+ Value newval;
+ if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
+ newval = new Value();
+ newval.txid = _attempt.getTransactionId();
+ if (val == null) {
+ newval.count = _sum;
}
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "sum"));
- }
+ else {
+ newval.count = _sum + val.count;
+ }
+ DATABASE.put(GLOBAL_COUNT_KEY, newval);
+ }
+ else {
+ newval = val;
+ }
+ _collector.emit(new Values(_attempt, newval.count));
}
-
- public static void main(String[] args) throws Exception {
- MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
- TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
- builder.setBolt("partial-count", new BatchCount(), 5)
- .noneGrouping("spout");
- builder.setBolt("sum", new UpdateGlobalCount())
- .globalGrouping("partial-count");
-
- LocalCluster cluster = new LocalCluster();
-
- Config config = new Config();
- config.setDebug(true);
- config.setMaxSpoutPending(3);
-
- cluster.submitTopology("global-count-topology", config, builder.buildTopology());
-
- Thread.sleep(3000);
- cluster.shutdown();
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "sum"));
}
+ }
+
+ public static void main(String[] args) throws Exception {
+ MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+ TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+ builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
+ builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
+
+ LocalCluster cluster = new LocalCluster();
+
+ Config config = new Config();
+ config.setDebug(true);
+ config.setMaxSpoutPending(3);
+
+ cluster.submitTopology("global-count-topology", config, builder.buildTopology());
+
+ Thread.sleep(3000);
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/TransactionalWords.java b/src/jvm/storm/starter/TransactionalWords.java
index 0252b66..4ee7b12 100644
--- a/src/jvm/storm/starter/TransactionalWords.java
+++ b/src/jvm/storm/starter/TransactionalWords.java
@@ -15,6 +15,7 @@ import backtype.storm.transactional.TransactionalTopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
@@ -22,205 +23,207 @@ import java.util.List;
import java.util.Map;
/**
- * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology
- * processes a stream of words and produces two outputs:
- *
- * 1. A count for each word (stored in a database)
- * 2. The number of words for every bucket of 10 counts. So it stores in the database how many words have appeared
- * 0-9 times, how many have appeared 10-19 times, and so on.
- *
+ * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
+ * stream of words and produces two outputs:
+ * <p/>
+ * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
+ * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
+ * <p/>
* A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
* between buckets as their counts accumulate.
*/
public class TransactionalWords {
- public static class CountValue {
- Integer prev_count = null;
- int count = 0;
- BigInteger txid = null;
+ public static class CountValue {
+ Integer prev_count = null;
+ int count = 0;
+ BigInteger txid = null;
+ }
+
+ public static class BucketValue {
+ int count = 0;
+ BigInteger txid;
+ }
+
+ public static final int BUCKET_SIZE = 10;
+
+ public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
+ public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
+
+
+ public static final int PARTITION_TAKE_PER_BATCH = 3;
+
+ public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+ put(0, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("chicken"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ }});
+ put(1, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ add(new Values("banana"));
+ }});
+ put(2, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ }});
+ }};
+
+ public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
+ Map<String, Integer> _counts = new HashMap<String, Integer>();
+ BatchOutputCollector _collector;
+ TransactionAttempt _id;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
+ _collector = collector;
+ _id = id;
}
- public static class BucketValue {
- int count = 0;
- BigInteger txid;
+ @Override
+ public void execute(Tuple tuple) {
+ String key = tuple.getString(1);
+ Integer curr = _counts.get(key);
+ if (curr == null)
+ curr = 0;
+ _counts.put(key, curr + 1);
}
-
- public static final int BUCKET_SIZE = 10;
-
- public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
- public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-
-
- public static final int PARTITION_TAKE_PER_BATCH = 3;
-
- public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
- put(0, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("chicken"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- }});
- put(1, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- add(new Values("banana"));
- }});
- put(2, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- }});
- }};
-
- public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
- Map<String, Integer> _counts = new HashMap<String, Integer>();
- BatchOutputCollector _collector;
- TransactionAttempt _id;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
- _collector = collector;
- _id = id;
- }
- @Override
- public void execute(Tuple tuple) {
- String key = tuple.getString(1);
- Integer curr = _counts.get(key);
- if(curr==null) curr = 0;
- _counts.put(key, curr + 1);
+ @Override
+ public void finishBatch() {
+ for (String key : _counts.keySet()) {
+ CountValue val = COUNT_DATABASE.get(key);
+ CountValue newVal;
+ if (val == null || !val.txid.equals(_id)) {
+ newVal = new CountValue();
+ newVal.txid = _id.getTransactionId();
+ if (val != null) {
+ newVal.prev_count = val.count;
+ newVal.count = val.count;
+ }
+ newVal.count = newVal.count + _counts.get(key);
+ COUNT_DATABASE.put(key, newVal);
}
-
- @Override
- public void finishBatch() {
- for(String key: _counts.keySet()) {
- CountValue val = COUNT_DATABASE.get(key);
- CountValue newVal;
- if(val==null || !val.txid.equals(_id)) {
- newVal = new CountValue();
- newVal.txid = _id.getTransactionId();
- if(val!=null) {
- newVal.prev_count = val.count;
- newVal.count = val.count;
- }
- newVal.count = newVal.count + _counts.get(key);
- COUNT_DATABASE.put(key, newVal);
- } else {
- newVal = val;
- }
- _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
- }
+ else {
+ newVal = val;
}
+ _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
+ }
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "key", "count", "prev-count"));
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "key", "count", "prev-count"));
}
-
- public static class Bucketize extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
- int curr = tuple.getInteger(2);
- Integer prev = tuple.getInteger(3);
-
- int currBucket = curr / BUCKET_SIZE;
- Integer prevBucket = null;
- if(prev!=null) {
- prevBucket = prev / BUCKET_SIZE;
- }
-
- if(prevBucket==null) {
- collector.emit(new Values(attempt, currBucket, 1));
- } else if(currBucket != prevBucket) {
- collector.emit(new Values(attempt, currBucket, 1));
- collector.emit(new Values(attempt, prevBucket, -1));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("attempt", "bucket", "delta"));
- }
+ }
+
+ public static class Bucketize extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
+ int curr = tuple.getInteger(2);
+ Integer prev = tuple.getInteger(3);
+
+ int currBucket = curr / BUCKET_SIZE;
+ Integer prevBucket = null;
+ if (prev != null) {
+ prevBucket = prev / BUCKET_SIZE;
+ }
+
+ if (prevBucket == null) {
+ collector.emit(new Values(attempt, currBucket, 1));
+ }
+ else if (currBucket != prevBucket) {
+ collector.emit(new Values(attempt, currBucket, 1));
+ collector.emit(new Values(attempt, prevBucket, -1));
+ }
}
-
- public static class BucketCountUpdater extends BaseTransactionalBolt {
- Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
- BatchOutputCollector _collector;
- TransactionAttempt _attempt;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
- _collector = collector;
- _attempt = attempt;
- }
- @Override
- public void execute(Tuple tuple) {
- Integer bucket = tuple.getInteger(1);
- Integer delta = tuple.getInteger(2);
- Integer curr = _accum.get(bucket);
- if(curr==null) curr = 0;
- _accum.put(bucket, curr + delta);
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("attempt", "bucket", "delta"));
+ }
+ }
- @Override
- public void finishBatch() {
- for(Integer bucket: _accum.keySet()) {
- BucketValue currVal = BUCKET_DATABASE.get(bucket);
- BucketValue newVal;
- if(currVal==null || !currVal.txid.equals(_attempt.getTransactionId())) {
- newVal = new BucketValue();
- newVal.txid = _attempt.getTransactionId();
- newVal.count = _accum.get(bucket);
- if(currVal!=null) newVal.count += currVal.count;
- BUCKET_DATABASE.put(bucket, newVal);
- } else {
- newVal = currVal;
- }
- _collector.emit(new Values(_attempt, bucket, newVal.count));
- }
- }
+ public static class BucketCountUpdater extends BaseTransactionalBolt {
+ Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
+ BatchOutputCollector _collector;
+ TransactionAttempt _attempt;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+ _collector = collector;
+ _attempt = attempt;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ Integer bucket = tuple.getInteger(1);
+ Integer delta = tuple.getInteger(2);
+ Integer curr = _accum.get(bucket);
+ if (curr == null)
+ curr = 0;
+ _accum.put(bucket, curr + delta);
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "bucket", "count"));
- }
+ @Override
+ public void finishBatch() {
+ for (Integer bucket : _accum.keySet()) {
+ BucketValue currVal = BUCKET_DATABASE.get(bucket);
+ BucketValue newVal;
+ if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
+ newVal = new BucketValue();
+ newVal.txid = _attempt.getTransactionId();
+ newVal.count = _accum.get(bucket);
+ if (currVal != null)
+ newVal.count += currVal.count;
+ BUCKET_DATABASE.put(bucket, newVal);
+ }
+ else {
+ newVal = currVal;
+ }
+ _collector.emit(new Values(_attempt, bucket, newVal.count));
+ }
}
-
- public static void main(String[] args) throws Exception {
- MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
- TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
- builder.setBolt("count", new KeyedCountUpdater(), 5)
- .fieldsGrouping("spout", new Fields("word"));
- builder.setBolt("bucketize", new Bucketize())
- .noneGrouping("count");
- builder.setBolt("buckets", new BucketCountUpdater(), 5)
- .fieldsGrouping("bucketize", new Fields("bucket"));
-
-
- LocalCluster cluster = new LocalCluster();
-
- Config config = new Config();
- config.setDebug(true);
- config.setMaxSpoutPending(3);
-
- cluster.submitTopology("top-n-topology", config, builder.buildTopology());
-
- Thread.sleep(3000);
- cluster.shutdown();
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "bucket", "count"));
}
+ }
+
+ public static void main(String[] args) throws Exception {
+ MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+ TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+ builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
+ builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+ builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
+
+
+ LocalCluster cluster = new LocalCluster();
+
+ Config config = new Config();
+ config.setDebug(true);
+ config.setMaxSpoutPending(3);
+
+ cluster.submitTopology("top-n-topology", config, builder.buildTopology());
+
+ Thread.sleep(3000);
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/WordCountTopology.java b/src/jvm/storm/starter/WordCountTopology.java
index bed968d..f26278c 100644
--- a/src/jvm/storm/starter/WordCountTopology.java
+++ b/src/jvm/storm/starter/WordCountTopology.java
@@ -1,11 +1,9 @@
package storm.starter;
-import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
-import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -14,6 +12,8 @@ import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import storm.starter.spout.RandomSentenceSpout;
+
import java.util.HashMap;
import java.util.Map;
@@ -21,70 +21,70 @@ import java.util.Map;
* This topology demonstrates Storm's stream groupings and multilang capabilities.
*/
public class WordCountTopology {
- public static class SplitSentence extends ShellBolt implements IRichBolt {
-
- public SplitSentence() {
- super("python", "splitsentence.py");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if(count==null) count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
+ public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+ public SplitSentence() {
+ super("python", "splitsentence.py");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
}
-
- public static void main(String[] args) throws Exception {
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new RandomSentenceSpout(), 5);
-
- builder.setBolt("split", new SplitSentence(), 8)
- .shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), 12)
- .fieldsGrouping("split", new Fields("word"));
-
- Config conf = new Config();
- conf.setDebug(true);
-
-
- if(args!=null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- } else {
- conf.setMaxTaskParallelism(3);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("word-count", conf, builder.createTopology());
-
- Thread.sleep(10000);
-
- cluster.shutdown();
- }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+ builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ }
+ else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(10000);
+
+ cluster.shutdown();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 27d65a0..6e2551c 100644
--- a/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -1,12 +1,5 @@
package storm.starter.bolt;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import storm.starter.tools.Rankings;
-import storm.starter.util.TupleHelpers;
import backtype.storm.Config;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -14,82 +7,87 @@ import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import storm.starter.tools.Rankings;
+import storm.starter.util.TupleHelpers;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
- *
+ * <p/>
* It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
* actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
* tuples are retrieved and counted.
- *
*/
public abstract class AbstractRankerBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 4931640198501530202L;
- private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
- private static final int DEFAULT_COUNT = 10;
+ private static final long serialVersionUID = 4931640198501530202L;
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
+ private static final int DEFAULT_COUNT = 10;
- private final int emitFrequencyInSeconds;
- private final int count;
- private final Rankings rankings;
+ private final int emitFrequencyInSeconds;
+ private final int count;
+ private final Rankings rankings;
- public AbstractRankerBolt() {
- this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
- }
+ public AbstractRankerBolt() {
+ this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
- public AbstractRankerBolt(int topN) {
- this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
- }
+ public AbstractRankerBolt(int topN) {
+ this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
- public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
- if (topN < 1) {
- throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
- }
- if (emitFrequencyInSeconds < 1) {
- throw new IllegalArgumentException("The emit frequency must be >= 1 seconds (you requested "
- + emitFrequencyInSeconds + " seconds)");
- }
- count = topN;
- this.emitFrequencyInSeconds = emitFrequencyInSeconds;
- rankings = new Rankings(count);
+ public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
+ if (topN < 1) {
+ throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
}
-
- protected Rankings getRankings() {
- return rankings;
+ if (emitFrequencyInSeconds < 1) {
+ throw new IllegalArgumentException(
+ "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
}
-
- /**
- * This method functions as a template method (design pattern).
- */
- @Override
- public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleHelpers.isTickTuple(tuple)) {
- getLogger().debug("Received tick tuple, triggering emit of current rankings");
- emitRankings(collector);
- }
- else {
- updateRankingsWithTuple(tuple);
- }
+ count = topN;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ rankings = new Rankings(count);
+ }
+
+ protected Rankings getRankings() {
+ return rankings;
+ }
+
+ /**
+ * This method functions as a template method (design pattern).
+ */
+ @Override
+ public final void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleHelpers.isTickTuple(tuple)) {
+ getLogger().debug("Received tick tuple, triggering emit of current rankings");
+ emitRankings(collector);
+ }
+ else {
+ updateRankingsWithTuple(tuple);
}
+ }
- abstract void updateRankingsWithTuple(Tuple tuple);
+ abstract void updateRankingsWithTuple(Tuple tuple);
- private void emitRankings(BasicOutputCollector collector) {
- collector.emit(new Values(rankings));
- getLogger().debug("Rankings: " + rankings);
- }
+ private void emitRankings(BasicOutputCollector collector) {
+ collector.emit(new Values(rankings));
+ getLogger().debug("Rankings: " + rankings);
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("rankings"));
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("rankings"));
+ }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
- return conf;
- }
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
- abstract Logger getLogger();
+ abstract Logger getLogger();
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java b/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
index 161e67d..6a57d47 100644
--- a/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
+++ b/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
@@ -1,43 +1,41 @@
package storm.starter.bolt;
+import backtype.storm.tuple.Tuple;
import org.apache.log4j.Logger;
-
import storm.starter.tools.Rankable;
import storm.starter.tools.RankableObjectWithFields;
-import backtype.storm.tuple.Tuple;
/**
* This bolt ranks incoming objects by their count.
- *
+ * <p/>
* It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
* additionalField2, ..., additionalFieldN).
- *
*/
public final class IntermediateRankingsBolt extends AbstractRankerBolt {
- private static final long serialVersionUID = -1369800530256637409L;
- private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
+ private static final long serialVersionUID = -1369800530256637409L;
+ private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
- public IntermediateRankingsBolt() {
- super();
- }
+ public IntermediateRankingsBolt() {
+ super();
+ }
- public IntermediateRankingsBolt(int topN) {
- super(topN);
- }
+ public IntermediateRankingsBolt(int topN) {
+ super(topN);
+ }
- public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
- super(topN, emitFrequencyInSeconds);
- }
+ public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
+ super(topN, emitFrequencyInSeconds);
+ }
- @Override
- void updateRankingsWithTuple(Tuple tuple) {
- Rankable rankable = RankableObjectWithFields.from(tuple);
- super.getRankings().updateWith(rankable);
- }
+ @Override
+ void updateRankingsWithTuple(Tuple tuple) {
+ Rankable rankable = RankableObjectWithFields.from(tuple);
+ super.getRankings().updateWith(rankable);
+ }
- @Override
- Logger getLogger() {
- return LOG;
- }
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/PrinterBolt.java b/src/jvm/storm/starter/bolt/PrinterBolt.java
index ac380ed..8b2e62b 100644
--- a/src/jvm/storm/starter/bolt/PrinterBolt.java
+++ b/src/jvm/storm/starter/bolt/PrinterBolt.java
@@ -8,13 +8,13 @@ import backtype.storm.tuple.Tuple;
public class PrinterBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- System.out.println(tuple);
- }
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ System.out.println(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer ofd) {
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer ofd) {
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/RollingCountBolt.java b/src/jvm/storm/starter/bolt/RollingCountBolt.java
index 0066b61..d547749 100644
--- a/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -1,14 +1,5 @@
package storm.starter.bolt;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.log4j.Logger;
-
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
@@ -17,110 +8,118 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import storm.starter.tools.NthLastModifiedTimeTracker;
+import storm.starter.tools.SlidingWindowCounter;
+import storm.starter.util.TupleHelpers;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
/**
* This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.
- *
+ * <p/>
* The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output
* data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the
* bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five
* minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every
* minute.
- *
+ * <p/>
* The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the
* actual duration of the sliding window. The latter is included in case the expected sliding window length (as
* configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual
* window length is tracked and calculated for the window, and not individually for each object within a window.
- *
+ * <p/>
* Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window
* length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window
* counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning
* during the first ~ five minutes of startup time if the window length is set to five minutes).
- *
*/
public class RollingCountBolt extends BaseRichBolt {
- private static final long serialVersionUID = 5537727428628598519L;
- private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
- private static final int NUM_WINDOW_CHUNKS = 5;
- private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
- private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
- private static final String WINDOW_LENGTH_WARNING_TEMPLATE = "Actual window length is %d seconds when it should be %d seconds"
- + " (you can safely ignore this warning during the startup phase)";
-
- private final SlidingWindowCounter<Object> counter;
- private final int windowLengthInSeconds;
- private final int emitFrequencyInSeconds;
- private OutputCollector collector;
- private NthLastModifiedTimeTracker lastModifiedTracker;
-
- public RollingCountBolt() {
- this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ private static final long serialVersionUID = 5537727428628598519L;
+ private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
+ private static final int NUM_WINDOW_CHUNKS = 5;
+ private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+ private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+ "Actual window length is %d seconds when it should be %d seconds"
+ + " (you can safely ignore this warning during the startup phase)";
+
+ private final SlidingWindowCounter<Object> counter;
+ private final int windowLengthInSeconds;
+ private final int emitFrequencyInSeconds;
+ private OutputCollector collector;
+ private NthLastModifiedTimeTracker lastModifiedTracker;
+
+ public RollingCountBolt() {
+ this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+ this.windowLengthInSeconds = windowLengthInSeconds;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
+ this.emitFrequencyInSeconds));
+ }
+
+ private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+ return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
+ this.emitFrequencyInSeconds));
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleHelpers.isTickTuple(tuple)) {
+ LOG.debug("Received tick tuple, triggering emit of current window counts");
+ emitCurrentWindowCounts();
}
-
- public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
- this.windowLengthInSeconds = windowLengthInSeconds;
- this.emitFrequencyInSeconds = emitFrequencyInSeconds;
- counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
- this.emitFrequencyInSeconds));
+ else {
+ countObjAndAck(tuple);
}
-
- private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
- return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+ }
+
+ private void emitCurrentWindowCounts() {
+ Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
+ int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+ lastModifiedTracker.markAsModified();
+ if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+ LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
}
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
- this.emitFrequencyInSeconds));
- }
-
- @Override
- public void execute(Tuple tuple) {
- if (TupleHelpers.isTickTuple(tuple)) {
- LOG.debug("Received tick tuple, triggering emit of current window counts");
- emitCurrentWindowCounts();
- }
- else {
- countObjAndAck(tuple);
- }
- }
-
- private void emitCurrentWindowCounts() {
- Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
- int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
- lastModifiedTracker.markAsModified();
- if (actualWindowLengthInSeconds != windowLengthInSeconds) {
- LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
- }
- emit(counts, actualWindowLengthInSeconds);
- }
-
- private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
- for (Entry<Object, Long> entry : counts.entrySet()) {
- Object obj = entry.getKey();
- Long count = entry.getValue();
- collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
- }
- }
-
- private void countObjAndAck(Tuple tuple) {
- Object obj = tuple.getValue(0);
- counter.incrementCount(obj);
- collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
- return conf;
+ emit(counts, actualWindowLengthInSeconds);
+ }
+
+ private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
+ for (Entry<Object, Long> entry : counts.entrySet()) {
+ Object obj = entry.getKey();
+ Long count = entry.getValue();
+ collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
}
+ }
+
+ private void countObjAndAck(Tuple tuple) {
+ Object obj = tuple.getValue(0);
+ counter.incrementCount(obj);
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
}