You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:45 UTC
[49/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
new file mode 100644
index 0000000..13b2121
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseBatchBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.*;
+
+/**
+ * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
+ * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
+ * <p/>
+ * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
+ * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
+ * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
+ * records.
+ * <p/>
+ * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
+ * minutes on a single machine into one that takes just a couple seconds.
+ * <p/>
+ * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
+ *
+ * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
+ */
+public class ReachTopology {
+ public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+ put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+ put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+ }};
+
+ public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+ put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+ put("tim", Arrays.asList("alex"));
+ put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+ put("adam", Arrays.asList("david", "carissa"));
+ put("mike", Arrays.asList("john", "bob"));
+ put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+ }};
+
+ public static class GetTweeters extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ Object id = tuple.getValue(0);
+ String url = tuple.getString(1);
+ List<String> tweeters = TWEETERS_DB.get(url);
+ if (tweeters != null) {
+ for (String tweeter : tweeters) {
+ collector.emit(new Values(id, tweeter));
+ }
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "tweeter"));
+ }
+ }
+
+ public static class GetFollowers extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ Object id = tuple.getValue(0);
+ String tweeter = tuple.getString(1);
+ List<String> followers = FOLLOWERS_DB.get(tweeter);
+ if (followers != null) {
+ for (String follower : followers) {
+ collector.emit(new Values(id, follower));
+ }
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "follower"));
+ }
+ }
+
+ public static class PartialUniquer extends BaseBatchBolt {
+ BatchOutputCollector _collector;
+ Object _id;
+ Set<String> _followers = new HashSet<String>();
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _followers.add(tuple.getString(1));
+ }
+
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _followers.size()));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "partial-count"));
+ }
+ }
+
+ public static class CountAggregator extends BaseBatchBolt {
+ BatchOutputCollector _collector;
+ Object _id;
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _count += tuple.getInteger(1);
+ }
+
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "reach"));
+ }
+ }
+
+ public static LinearDRPCTopologyBuilder construct() {
+ LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
+ builder.addBolt(new GetTweeters(), 4);
+ builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+ builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
+ builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
+ return builder;
+ }
+
+ public static void main(String[] args) throws Exception {
+ LinearDRPCTopologyBuilder builder = construct();
+
+
+ Config conf = new Config();
+
+ if (args == null || args.length == 0) {
+ conf.setMaxTaskParallelism(3);
+ LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
+
+ String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
+ for (String url : urlsToTry) {
+ System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+ }
+
+ cluster.shutdown();
+ drpc.shutdown();
+ }
+ else {
+ conf.setNumWorkers(6);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
new file mode 100644
index 0000000..d4aa304
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+public class ResourceAwareExampleTopology {
+ public static class ExclamationBolt extends BaseRichBolt {
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ _collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10);
+ //set cpu requirement
+ spout.setCPULoad(20);
+ //set onheap and offheap memory requirement
+ spout.setMemoryLoad(64, 16);
+
+ BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+ //sets cpu requirement. Not neccessary to set both CPU and memory.
+ //For requirements not set, a default value will be used
+ bolt1.setCPULoad(15);
+
+ BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+ bolt2.setMemoryLoad(100);
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ /**
+ * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
+ * Can be used to spread executors to to multiple workers
+ */
+ conf.setTopologyWorkerMaxHeapSize(1024.0);
+
+ //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
+ //Recommended range of 0-29 but no hard limit set.
+ conf.setTopologyPriority(29);
+
+ // Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
+ conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ }
+ else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(10000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
new file mode 100644
index 0000000..b5ee161
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
+import org.apache.storm.starter.bolt.RollingCountBolt;
+import org.apache.storm.starter.bolt.TotalRankingsBolt;
+import org.apache.storm.starter.util.StormRunner;
+
+/**
+ * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
+ * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
+ * like trending topics or trending images on Twitter.
+ */
+public class RollingTopWords {
+
+ private static final Logger LOG = Logger.getLogger(RollingTopWords.class);
+ private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+ private static final int TOP_N = 5;
+
+ private final TopologyBuilder builder;
+ private final String topologyName;
+ private final Config topologyConfig;
+ private final int runtimeInSeconds;
+
+ public RollingTopWords(String topologyName) throws InterruptedException {
+ builder = new TopologyBuilder();
+ this.topologyName = topologyName;
+ topologyConfig = createTopologyConfiguration();
+ runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+ wireTopology();
+ }
+
+ private static Config createTopologyConfiguration() {
+ Config conf = new Config();
+ conf.setDebug(true);
+ return conf;
+ }
+
+ private void wireTopology() throws InterruptedException {
+ String spoutId = "wordGenerator";
+ String counterId = "counter";
+ String intermediateRankerId = "intermediateRanker";
+ String totalRankerId = "finalRanker";
+ builder.setSpout(spoutId, new TestWordSpout(), 5);
+ builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
+ builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
+ "obj"));
+ builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+ }
+
+ public void runLocally() throws InterruptedException {
+ StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+ }
+
+ public void runRemotely() throws Exception {
+ StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+ }
+
+ /**
+ * Submits (runs) the topology.
+ *
+ * Usage: "RollingTopWords [topology-name] [local|remote]"
+ *
+ * By default, the topology is run locally under the name "slidingWindowCounts".
+ *
+ * Examples:
+ *
+ * ```
+ *
+ * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
+ *
+ * # Runs in local mode (LocalCluster), with topology name "foobar"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar
+ *
+ * # Runs in local mode (LocalCluster), with topology name "foobar"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar local
+ *
+ * # Runs in remote/cluster mode, with topology name "production-topology"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology remote
+ * ```
+ *
+ * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
+ * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ String topologyName = "slidingWindowCounts";
+ if (args.length >= 1) {
+ topologyName = args[0];
+ }
+ boolean runLocally = true;
+ if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
+ runLocally = false;
+ }
+
+ LOG.info("Topology name: " + topologyName);
+ RollingTopWords rtw = new RollingTopWords(topologyName);
+ if (runLocally) {
+ LOG.info("Running in local mode");
+ rtw.runLocally();
+ }
+ else {
+ LOG.info("Running in remote (cluster) mode");
+ rtw.runRemotely();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
new file mode 100644
index 0000000..b153372
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.starter.bolt.SingleJoinBolt;
+
+public class SingleJoinExample {
+ public static void main(String[] args) {
+ FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+ FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("gender", genderSpout);
+ builder.setSpout("age", ageSpout);
+ builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
+ .fieldsGrouping("age", new Fields("id"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("join-example", conf, builder.createTopology());
+
+ for (int i = 0; i < 10; i++) {
+ String gender;
+ if (i % 2 == 0) {
+ gender = "male";
+ }
+ else {
+ gender = "female";
+ }
+ genderSpout.feed(new Values(i, gender));
+ }
+
+ for (int i = 9; i >= 0; i--) {
+ ageSpout.feed(new Values(i, i + 20));
+ }
+
+ Utils.sleep(2000);
+ cluster.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
new file mode 100644
index 0000000..3addc15
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
+import org.apache.storm.starter.bolt.RollingCountBolt;
+import org.apache.storm.starter.bolt.RollingCountAggBolt;
+import org.apache.storm.starter.bolt.TotalRankingsBolt;
+import org.apache.storm.starter.util.StormRunner;
+
+/**
+ * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
+ * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
+ * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much
+ * more common then other words, and uses partialKeyGrouping to better balance the skewed load.
+ */
+public class SkewedRollingTopWords {
+ private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class);
+ private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+ private static final int TOP_N = 5;
+
+ private final TopologyBuilder builder;
+ private final String topologyName;
+ private final Config topologyConfig;
+ private final int runtimeInSeconds;
+
+ public SkewedRollingTopWords(String topologyName) throws InterruptedException {
+ builder = new TopologyBuilder();
+ this.topologyName = topologyName;
+ topologyConfig = createTopologyConfiguration();
+ runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+ wireTopology();
+ }
+
+ private static Config createTopologyConfiguration() {
+ Config conf = new Config();
+ conf.setDebug(true);
+ return conf;
+ }
+
+ private void wireTopology() throws InterruptedException {
+ String spoutId = "wordGenerator";
+ String counterId = "counter";
+ String aggId = "aggregator";
+ String intermediateRankerId = "intermediateRanker";
+ String totalRankerId = "finalRanker";
+ builder.setSpout(spoutId, new TestWordSpout(), 5);
+ builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));
+ builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
+ builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
+ builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+ }
+
+ public void runLocally() throws InterruptedException {
+ StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+ }
+
+ public void runRemotely() throws Exception {
+ StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+ }
+
+ /**
+ * Submits (runs) the topology.
+ *
+ * Usage: "RollingTopWords [topology-name] [local|remote]"
+ *
+ * By default, the topology is run locally under the name "slidingWindowCounts".
+ *
+ * Examples:
+ *
+ * ```
+ *
+ * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
+ *
+ * # Runs in local mode (LocalCluster), with topology name "foobar"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar
+ *
+ * # Runs in local mode (LocalCluster), with topology name "foobar"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar local
+ *
+ * # Runs in remote/cluster mode, with topology name "production-topology"
+ * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology remote
+ * ```
+ *
+ * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
+ * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ String topologyName = "slidingWindowCounts";
+ if (args.length >= 1) {
+ topologyName = args[0];
+ }
+ boolean runLocally = true;
+ if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
+ runLocally = false;
+ }
+
+ LOG.info("Topology name: " + topologyName);
+ SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName);
+ if (runLocally) {
+ LOG.info("Running in local mode");
+ rtw.runLocally();
+ }
+ else {
+ LOG.info("Running in remote (cluster) mode");
+ rtw.runRemotely();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
new file mode 100644
index 0000000..90744f2
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Windowing based on tuple timestamp (e.g. the time when tuple is generated
+ * rather than when its processed).
+ */
+public class SlidingTupleTsTopology {
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+ BaseWindowedBolt bolt = new SlidingWindowSumBolt()
+ .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
+ .withTimestampField("ts")
+ .withLag(new Duration(5, TimeUnit.SECONDS));
+ builder.setSpout("integer", new RandomIntegerSpout(), 1);
+ builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
+ builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(40000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
new file mode 100644
index 0000000..cedcec5
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * A sample topology that demonstrates the usage of {@link org.apache.storm.topology.IWindowedBolt}
+ * to calculate sliding window sum.
+ */
+public class SlidingWindowTopology {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
+
+ /*
+ * Computes tumbling window average
+ */
+ private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ int sum = 0;
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ LOG.debug("Events in current window: " + tuplesInWindow.size());
+ if (tuplesInWindow.size() > 0) {
+ /*
+ * Since this is a tumbling window calculation,
+ * we use all the tuples in the window to compute the avg.
+ */
+ for (Tuple tuple : tuplesInWindow) {
+ sum += (int) tuple.getValue(0);
+ }
+ collector.emit(new Values(sum / tuplesInWindow.size()));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("avg"));
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("integer", new RandomIntegerSpout(), 1);
+ builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
+ .shuffleGrouping("integer");
+ builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
+ .shuffleGrouping("slidingsum");
+ builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
+ Config conf = new Config();
+ conf.setDebug(true);
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } else {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(40000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
new file mode 100644
index 0000000..8ee48c9
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.metric.HttpForwardingMetricsServer;
+import org.apache.storm.metric.HttpForwardingMetricsConsumer;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
+import org.apache.storm.generated.*;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.StormSubmitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.HdrHistogram.Histogram;
+
+/**
+ * WordCount but the spout goes at a predefined rate and we collect
+ * proper latency statistics.
+ */
+public class ThroughputVsLatency {
+ private static class SentWithTime {
+ public final String sentence;
+ public final long time;
+
+ SentWithTime(String sentence, long time) {
+ this.sentence = sentence;
+ this.time = time;
+ }
+ }
+
+ public static class C {
+ LocalCluster _local = null;
+ Nimbus.Client _client = null;
+
+ public C(Map conf) {
+ Map clusterConf = Utils.readStormConfig();
+ if (conf != null) {
+ clusterConf.putAll(conf);
+ }
+ Boolean isLocal = (Boolean)clusterConf.get("run.local");
+ if (isLocal != null && isLocal) {
+ _local = new LocalCluster();
+ } else {
+ _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ }
+ }
+
+ public ClusterSummary getClusterInfo() throws Exception {
+ if (_local != null) {
+ return _local.getClusterInfo();
+ } else {
+ return _client.getClusterInfo();
+ }
+ }
+
+ public TopologyInfo getTopologyInfo(String id) throws Exception {
+ if (_local != null) {
+ return _local.getTopologyInfo(id);
+ } else {
+ return _client.getTopologyInfo(id);
+ }
+ }
+
+ public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
+ if (_local != null) {
+ _local.killTopologyWithOpts(name, opts);
+ } else {
+ _client.killTopologyWithOpts(name, opts);
+ }
+ }
+
+ public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
+ if (_local != null) {
+ _local.submitTopology(name, stormConf, topology);
+ } else {
+ StormSubmitter.submitTopology(name, stormConf, topology);
+ }
+ }
+
+ public boolean isLocal() {
+ return _local != null;
+ }
+ }
+
+ public static class FastRandomSentenceSpout extends BaseRichSpout {
+ static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+ "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+ SpoutOutputCollector _collector;
+ long _periodNano;
+ long _emitAmount;
+ Random _rand;
+ long _nextEmitTime;
+ long _emitsLeft;
+ HistogramMetric _histo;
+
+ public FastRandomSentenceSpout(long ratePerSecond) {
+ if (ratePerSecond > 0) {
+ _periodNano = Math.max(1, 1000000000/ratePerSecond);
+ _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+ } else {
+ _periodNano = Long.MAX_VALUE - 1;
+ _emitAmount = 1;
+ }
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ _rand = ThreadLocalRandom.current();
+ _nextEmitTime = System.nanoTime();
+ _emitsLeft = _emitAmount;
+ _histo = new HistogramMetric(3600000000000L, 3);
+ context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+ }
+
+ @Override
+ public void nextTuple() {
+ if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+ _emitsLeft = _emitAmount;
+ _nextEmitTime = _nextEmitTime + _periodNano;
+ }
+
+ if (_emitsLeft > 0) {
+ String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+ _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+ _emitsLeft--;
+ }
+ }
+
+ @Override
+ public void ack(Object id) {
+ long end = System.nanoTime();
+ SentWithTime st = (SentWithTime)id;
+ _histo.recordValue(end-st.time);
+ }
+
+ @Override
+ public void fail(Object id) {
+ SentWithTime st = (SentWithTime)id;
+ _collector.emit(new Values(st.sentence), id);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
+ }
+
+ public static class SplitSentence extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word: sentence.split("\\s+")) {
+ collector.emit(new Values(word, 1));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ private static class MemMeasure {
+ private long _mem = 0;
+ private long _time = 0;
+
+ public synchronized void update(long mem) {
+ _mem = mem;
+ _time = System.currentTimeMillis();
+ }
+
+ public synchronized long get() {
+ return isExpired() ? 0l : _mem;
+ }
+
+ public synchronized boolean isExpired() {
+ return (System.currentTimeMillis() - _time) >= 20000;
+ }
+ }
+
+ private static final Histogram _histo = new Histogram(3600000000000L, 3);
+ private static final AtomicLong _systemCPU = new AtomicLong(0);
+ private static final AtomicLong _userCPU = new AtomicLong(0);
+ private static final AtomicLong _gcCount = new AtomicLong(0);
+ private static final AtomicLong _gcMs = new AtomicLong(0);
+ private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+ private static long readMemory() {
+ long total = 0;
+ for (MemMeasure mem: _memoryBytes.values()) {
+ total += mem.get();
+ }
+ return total;
+ }
+
+ private static long _prev_acked = 0;
+ private static long _prev_uptime = 0;
+
+ public static void printMetrics(C client, String name) throws Exception {
+ ClusterSummary summary = client.getClusterInfo();
+ String id = null;
+ for (TopologySummary ts: summary.get_topologies()) {
+ if (name.equals(ts.get_name())) {
+ id = ts.get_id();
+ }
+ }
+ if (id == null) {
+ throw new Exception("Could not find a topology named "+name);
+ }
+ TopologyInfo info = client.getTopologyInfo(id);
+ int uptime = info.get_uptime_secs();
+ long acked = 0;
+ long failed = 0;
+ for (ExecutorSummary exec: info.get_executors()) {
+ if ("spout".equals(exec.get_component_id())) {
+ SpoutStats stats = exec.get_stats().get_specific().get_spout();
+ Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+ Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+ if (ackedMap != null) {
+ for (String key: ackedMap.keySet()) {
+ if (failedMap != null) {
+ Long tmp = failedMap.get(key);
+ if (tmp != null) {
+ failed += tmp;
+ }
+ }
+ long ackVal = ackedMap.get(key);
+ acked += ackVal;
+ }
+ }
+ }
+ }
+ long ackedThisTime = acked - _prev_acked;
+ long thisTime = uptime - _prev_uptime;
+ long nnpct, nnnpct, min, max;
+ double mean, stddev;
+ synchronized(_histo) {
+ nnpct = _histo.getValueAtPercentile(99.0);
+ nnnpct = _histo.getValueAtPercentile(99.9);
+ min = _histo.getMinValue();
+ max = _histo.getMaxValue();
+ mean = _histo.getMean();
+ stddev = _histo.getStdDeviation();
+ _histo.reset();
+ }
+ long user = _userCPU.getAndSet(0);
+ long sys = _systemCPU.getAndSet(0);
+ long gc = _gcMs.getAndSet(0);
+ double memMB = readMemory() / (1024.0 * 1024.0);
+ System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+ "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+ "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
+ uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+ min, max, mean, stddev, user, sys, gc, memMB);
+ _prev_uptime = uptime;
+ _prev_acked = acked;
+ }
+
+ public static void kill(C client, String name) throws Exception {
+ KillOptions opts = new KillOptions();
+ opts.set_wait_secs(0);
+ client.killTopologyWithOpts(name, opts);
+ }
+
+ public static void main(String[] args) throws Exception {
+ long ratePerSecond = 500;
+ if (args != null && args.length > 0) {
+ ratePerSecond = Long.valueOf(args[0]);
+ }
+
+ int parallelism = 4;
+ if (args != null && args.length > 1) {
+ parallelism = Integer.valueOf(args[1]);
+ }
+
+ int numMins = 5;
+ if (args != null && args.length > 2) {
+ numMins = Integer.valueOf(args[2]);
+ }
+
+ String name = "wc-test";
+ if (args != null && args.length > 3) {
+ name = args[3];
+ }
+
+ Config conf = new Config();
+ HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+ @Override
+ public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+ for (DataPoint dp: dataPoints) {
+ if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+ synchronized(_histo) {
+ _histo.add((Histogram)dp.value);
+ }
+ } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object sys = m.get("sys-ms");
+ if (sys instanceof Number) {
+ _systemCPU.getAndAdd(((Number)sys).longValue());
+ }
+ Object user = m.get("user-ms");
+ if (user instanceof Number) {
+ _userCPU.getAndAdd(((Number)user).longValue());
+ }
+ } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object count = m.get("count");
+ if (count instanceof Number) {
+ _gcCount.getAndAdd(((Number)count).longValue());
+ }
+ Object time = m.get("timeMs");
+ if (time instanceof Number) {
+ _gcMs.getAndAdd(((Number)time).longValue());
+ }
+ } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object val = m.get("usedBytes");
+ if (val instanceof Number) {
+ MemMeasure mm = _memoryBytes.get(worker);
+ if (mm == null) {
+ mm = new MemMeasure();
+ MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+ mm = tmp == null ? mm : tmp;
+ }
+ mm.update(((Number)val).longValue());
+ }
+ }
+ }
+ }
+ };
+
+ metricServer.serve();
+ String url = metricServer.getUrl();
+
+ C cluster = new C(conf);
+ conf.setNumWorkers(parallelism);
+ conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+ conf.registerMetricsConsumer(org.apache.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
+ Map<String, String> workerMetrics = new HashMap<String, String>();
+ if (!cluster.isLocal()) {
+ //sigar uses JNI and does not work in local mode
+ workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+ }
+ conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+ conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+ conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+ "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+ conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ int numEach = 4 * parallelism;
+ builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+
+ builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+
+ try {
+ cluster.submitTopology(name, conf, builder.createTopology());
+
+ for (int i = 0; i < numMins * 2; i++) {
+ Thread.sleep(30 * 1000);
+ printMetrics(cluster, name);
+ }
+ } finally {
+ kill(cluster, name);
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
new file mode 100644
index 0000000..312f83e
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MemoryTransactionalSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBatchBolt;
+import org.apache.storm.topology.base.BaseTransactionalBolt;
+import org.apache.storm.transactional.ICommitter;
+import org.apache.storm.transactional.TransactionAttempt;
+import org.apache.storm.transactional.TransactionalTopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
+ * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes.
+ *
+ * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
+ */
+public class TransactionalGlobalCount {
+ public static final int PARTITION_TAKE_PER_BATCH = 3;
+ public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+ put(0, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("chicken"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ }});
+ put(1, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ add(new Values("banana"));
+ }});
+ put(2, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ }});
+ }};
+
+ public static class Value {
+ int count = 0;
+ BigInteger txid;
+ }
+
+ public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+ public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
+
+ public static class BatchCount extends BaseBatchBolt {
+ Object _id;
+ BatchOutputCollector _collector;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _count++;
+ }
+
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "count"));
+ }
+ }
+
+ public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
+ TransactionAttempt _attempt;
+ BatchOutputCollector _collector;
+
+ int _sum = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+ _collector = collector;
+ _attempt = attempt;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _sum += tuple.getInteger(1);
+ }
+
+ @Override
+ public void finishBatch() {
+ Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+ Value newval;
+ if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
+ newval = new Value();
+ newval.txid = _attempt.getTransactionId();
+ if (val == null) {
+ newval.count = _sum;
+ }
+ else {
+ newval.count = _sum + val.count;
+ }
+ DATABASE.put(GLOBAL_COUNT_KEY, newval);
+ }
+ else {
+ newval = val;
+ }
+ _collector.emit(new Values(_attempt, newval.count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "sum"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+ TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+ builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
+ builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
+
+ LocalCluster cluster = new LocalCluster();
+
+ Config config = new Config();
+ config.setDebug(true);
+ config.setMaxSpoutPending(3);
+
+ cluster.submitTopology("global-count-topology", config, builder.buildTopology());
+
+ Thread.sleep(3000);
+ cluster.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
new file mode 100644
index 0000000..64689b0
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MemoryTransactionalSpout;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseTransactionalBolt;
+import org.apache.storm.transactional.ICommitter;
+import org.apache.storm.transactional.TransactionAttempt;
+import org.apache.storm.transactional.TransactionalTopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
+ * stream of words and produces two outputs:
+ * <p/>
+ * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
+ * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
+ * <p/>
+ * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
+ * between buckets as their counts accumulate.
+ */
+public class TransactionalWords {
+ public static class CountValue {
+ Integer prev_count = null;
+ int count = 0;
+ BigInteger txid = null;
+ }
+
+ public static class BucketValue {
+ int count = 0;
+ BigInteger txid;
+ }
+
+ public static final int BUCKET_SIZE = 10;
+
+ public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
+ public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
+
+
+ public static final int PARTITION_TAKE_PER_BATCH = 3;
+
+ public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+ put(0, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("chicken"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ }});
+ put(1, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ add(new Values("banana"));
+ }});
+ put(2, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ }});
+ }};
+
+ public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
+ Map<String, Integer> _counts = new HashMap<String, Integer>();
+ BatchOutputCollector _collector;
+ TransactionAttempt _id;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
+ _collector = collector;
+ _id = id;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String key = tuple.getString(1);
+ Integer curr = _counts.get(key);
+ if (curr == null)
+ curr = 0;
+ _counts.put(key, curr + 1);
+ }
+
+ @Override
+ public void finishBatch() {
+ for (String key : _counts.keySet()) {
+ CountValue val = COUNT_DATABASE.get(key);
+ CountValue newVal;
+ if (val == null || !val.txid.equals(_id)) {
+ newVal = new CountValue();
+ newVal.txid = _id.getTransactionId();
+ if (val != null) {
+ newVal.prev_count = val.count;
+ newVal.count = val.count;
+ }
+ newVal.count = newVal.count + _counts.get(key);
+ COUNT_DATABASE.put(key, newVal);
+ }
+ else {
+ newVal = val;
+ }
+ _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "key", "count", "prev-count"));
+ }
+ }
+
+ public static class Bucketize extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
+ int curr = tuple.getInteger(2);
+ Integer prev = tuple.getInteger(3);
+
+ int currBucket = curr / BUCKET_SIZE;
+ Integer prevBucket = null;
+ if (prev != null) {
+ prevBucket = prev / BUCKET_SIZE;
+ }
+
+ if (prevBucket == null) {
+ collector.emit(new Values(attempt, currBucket, 1));
+ }
+ else if (currBucket != prevBucket) {
+ collector.emit(new Values(attempt, currBucket, 1));
+ collector.emit(new Values(attempt, prevBucket, -1));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("attempt", "bucket", "delta"));
+ }
+ }
+
+ public static class BucketCountUpdater extends BaseTransactionalBolt {
+ Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
+ BatchOutputCollector _collector;
+ TransactionAttempt _attempt;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+ _collector = collector;
+ _attempt = attempt;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ Integer bucket = tuple.getInteger(1);
+ Integer delta = tuple.getInteger(2);
+ Integer curr = _accum.get(bucket);
+ if (curr == null)
+ curr = 0;
+ _accum.put(bucket, curr + delta);
+ }
+
+ @Override
+ public void finishBatch() {
+ for (Integer bucket : _accum.keySet()) {
+ BucketValue currVal = BUCKET_DATABASE.get(bucket);
+ BucketValue newVal;
+ if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
+ newVal = new BucketValue();
+ newVal.txid = _attempt.getTransactionId();
+ newVal.count = _accum.get(bucket);
+ if (currVal != null)
+ newVal.count += currVal.count;
+ BUCKET_DATABASE.put(bucket, newVal);
+ }
+ else {
+ newVal = currVal;
+ }
+ _collector.emit(new Values(_attempt, bucket, newVal.count));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "bucket", "count"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+ TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+ builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
+ builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+ builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
+
+
+ LocalCluster cluster = new LocalCluster();
+
+ Config config = new Config();
+ config.setDebug(true);
+ config.setMaxSpoutPending(3);
+
+ cluster.submitTopology("top-n-topology", config, builder.buildTopology());
+
+ Thread.sleep(3000);
+ cluster.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
new file mode 100644
index 0000000..e4a5711
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.starter.spout.RandomSentenceSpout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ */
+public class WordCountTopology {
+ public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+ public SplitSentence() {
+ super("python", "splitsentence.py");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+ builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ }
+ else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(10000);
+
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
new file mode 100644
index 0000000..431b9d8
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.*;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ */
+public class WordCountTopologyNode {
+ public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+ public SplitSentence() {
+ super("node", "splitsentence.js");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ public static class RandomSentence extends ShellSpout implements IRichSpout {
+
+ public RandomSentence() {
+ super("node", "randomsentence.js");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentence(), 5);
+
+ builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ }
+ else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(10000);
+
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
new file mode 100644
index 0000000..9cf9e79
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.Rankings;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
+ * <p/>
+ * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
+ * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
+ * tuples are retrieved and counted.
+ */
+public abstract class AbstractRankerBolt extends BaseBasicBolt {
+
+ private static final long serialVersionUID = 4931640198501530202L;
+ private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
+ private static final int DEFAULT_COUNT = 10;
+
+ private final int emitFrequencyInSeconds;
+ private final int count;
+ private final Rankings rankings;
+
+ public AbstractRankerBolt() {
+ this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ public AbstractRankerBolt(int topN) {
+ this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+ }
+
+ public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
+ if (topN < 1) {
+ throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
+ }
+ if (emitFrequencyInSeconds < 1) {
+ throw new IllegalArgumentException(
+ "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
+ }
+ count = topN;
+ this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+ rankings = new Rankings(count);
+ }
+
+ protected Rankings getRankings() {
+ return rankings;
+ }
+
+ /**
+ * This method functions as a template method (design pattern).
+ */
+ @Override
+ public final void execute(Tuple tuple, BasicOutputCollector collector) {
+ if (TupleUtils.isTick(tuple)) {
+ getLogger().debug("Received tick tuple, triggering emit of current rankings");
+ emitRankings(collector);
+ }
+ else {
+ updateRankingsWithTuple(tuple);
+ }
+ }
+
+ abstract void updateRankingsWithTuple(Tuple tuple);
+
+ private void emitRankings(BasicOutputCollector collector) {
+ collector.emit(new Values(rankings.copy()));
+ getLogger().debug("Rankings: " + rankings);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("rankings"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+ return conf;
+ }
+
+ abstract Logger getLogger();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
new file mode 100644
index 0000000..6950bfb
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.Rankable;
+import org.apache.storm.starter.tools.RankableObjectWithFields;
+
+/**
+ * This bolt ranks incoming objects by their count.
+ * <p/>
+ * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
+ * additionalField2, ..., additionalFieldN).
+ */
+public final class IntermediateRankingsBolt extends AbstractRankerBolt {
+
+ private static final long serialVersionUID = -1369800530256637409L;
+ private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
+
+ public IntermediateRankingsBolt() {
+ super();
+ }
+
+ public IntermediateRankingsBolt(int topN) {
+ super(topN);
+ }
+
+ public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
+ super(topN, emitFrequencyInSeconds);
+ }
+
+ @Override
+ void updateRankingsWithTuple(Tuple tuple) {
+ Rankable rankable = RankableObjectWithFields.from(tuple);
+ super.getRankings().updateWith(rankable);
+ }
+
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
new file mode 100644
index 0000000..993a937
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+
+public class PrinterBolt extends BaseBasicBolt {
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ System.out.println(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer ofd) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java
new file mode 100644
index 0000000..45300de
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.NthLastModifiedTimeTracker;
+import org.apache.storm.starter.tools.SlidingWindowCounter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This bolt aggregates counts from multiple upstream bolts.
+ */
+public class RollingCountAggBolt extends BaseRichBolt {
+ private static final long serialVersionUID = 5537727428628598519L;
+ private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class);
+ //Mapping of key->upstreamBolt->count
+ private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>();
+ private OutputCollector collector;
+
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ Object obj = tuple.getValue(0);
+ long count = tuple.getLong(1);
+ int source = tuple.getSourceTask();
+ Map<Integer, Long> subCounts = counts.get(obj);
+ if (subCounts == null) {
+ subCounts = new HashMap<Integer, Long>();
+ counts.put(obj, subCounts);
+ }
+ //Update the current count for this object
+ subCounts.put(source, count);
+ //Output the sum of all the known counts so for this key
+ long sum = 0;
+ for (Long val: subCounts.values()) {
+ sum += val;
+ }
+ collector.emit(new Values(obj, sum));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("obj", "count"));
+ }
+}