You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:45 UTC

[49/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
new file mode 100644
index 0000000..13b2121
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseBatchBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.*;
+
+/**
+ * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
+ * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
+ * <p/>
+ * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
+ * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
+ * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
+ * records.
+ * <p/>
+ * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
+ * minutes on a single machine into one that takes just a couple seconds.
+ * <p/>
+ * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
+ *
+ * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
+ */
+public class ReachTopology {
+  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+  }};
+
+  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+    put("tim", Arrays.asList("alex"));
+    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+    put("adam", Arrays.asList("david", "carissa"));
+    put("mike", Arrays.asList("john", "bob"));
+    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+  }};
+
+  public static class GetTweeters extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      Object id = tuple.getValue(0);
+      String url = tuple.getString(1);
+      List<String> tweeters = TWEETERS_DB.get(url);
+      if (tweeters != null) {
+        for (String tweeter : tweeters) {
+          collector.emit(new Values(id, tweeter));
+        }
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "tweeter"));
+    }
+  }
+
+  public static class GetFollowers extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      Object id = tuple.getValue(0);
+      String tweeter = tuple.getString(1);
+      List<String> followers = FOLLOWERS_DB.get(tweeter);
+      if (followers != null) {
+        for (String follower : followers) {
+          collector.emit(new Values(id, follower));
+        }
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "follower"));
+    }
+  }
+
+  public static class PartialUniquer extends BaseBatchBolt {
+    BatchOutputCollector _collector;
+    Object _id;
+    Set<String> _followers = new HashSet<String>();
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+      _collector = collector;
+      _id = id;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _followers.add(tuple.getString(1));
+    }
+
+    @Override
+    public void finishBatch() {
+      _collector.emit(new Values(_id, _followers.size()));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "partial-count"));
+    }
+  }
+
+  public static class CountAggregator extends BaseBatchBolt {
+    BatchOutputCollector _collector;
+    Object _id;
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+      _collector = collector;
+      _id = id;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _count += tuple.getInteger(1);
+    }
+
+    @Override
+    public void finishBatch() {
+      _collector.emit(new Values(_id, _count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "reach"));
+    }
+  }
+
+  public static LinearDRPCTopologyBuilder construct() {
+    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
+    builder.addBolt(new GetTweeters(), 4);
+    builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+    builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
+    builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    LinearDRPCTopologyBuilder builder = construct();
+
+
+    Config conf = new Config();
+
+    if (args == null || args.length == 0) {
+      conf.setMaxTaskParallelism(3);
+      LocalDRPC drpc = new LocalDRPC();
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
+
+      String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
+      for (String url : urlsToTry) {
+        System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+      }
+
+      cluster.shutdown();
+      drpc.shutdown();
+    }
+    else {
+      conf.setNumWorkers(6);
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
new file mode 100644
index 0000000..d4aa304
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+public class ResourceAwareExampleTopology {
+  public static class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+      _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+      _collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TopologyBuilder builder = new TopologyBuilder();
+
+    SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 10);
+    //set cpu requirement
+    spout.setCPULoad(20);
+    //set onheap and offheap memory requirement
+    spout.setMemoryLoad(64, 16);
+
+    BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+    //sets cpu requirement.  Not neccessary to set both CPU and memory.
+    //For requirements not set, a default value will be used
+    bolt1.setCPULoad(15);
+
+    BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+    bolt2.setMemoryLoad(100);
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    /**
+     * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
+     * Can be used to spread executors to to multiple workers
+     */
+    conf.setTopologyWorkerMaxHeapSize(1024.0);
+
+    //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
+    //Recommended range of 0-29 but no hard limit set.
+    conf.setTopologyPriority(29);
+
+    // Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
+    conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    }
+    else {
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("test", conf, builder.createTopology());
+      Utils.sleep(10000);
+      cluster.killTopology("test");
+      cluster.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
new file mode 100644
index 0000000..b5ee161
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
+import org.apache.storm.starter.bolt.RollingCountBolt;
+import org.apache.storm.starter.bolt.TotalRankingsBolt;
+import org.apache.storm.starter.util.StormRunner;
+
+/**
+ * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
+ * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
+ * like trending topics or trending images on Twitter.
+ */
+public class RollingTopWords {
+
+  private static final Logger LOG = Logger.getLogger(RollingTopWords.class);
+  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+  private static final int TOP_N = 5;
+
+  private final TopologyBuilder builder;
+  private final String topologyName;
+  private final Config topologyConfig;
+  private final int runtimeInSeconds;
+
+  public RollingTopWords(String topologyName) throws InterruptedException {
+    builder = new TopologyBuilder();
+    this.topologyName = topologyName;
+    topologyConfig = createTopologyConfiguration();
+    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+    wireTopology();
+  }
+
+  private static Config createTopologyConfiguration() {
+    Config conf = new Config();
+    conf.setDebug(true);
+    return conf;
+  }
+
+  private void wireTopology() throws InterruptedException {
+    String spoutId = "wordGenerator";
+    String counterId = "counter";
+    String intermediateRankerId = "intermediateRanker";
+    String totalRankerId = "finalRanker";
+    builder.setSpout(spoutId, new TestWordSpout(), 5);
+    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
+    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
+        "obj"));
+    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+  }
+
+  public void runLocally() throws InterruptedException {
+    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+  }
+
+  public void runRemotely() throws Exception {
+    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+  }
+
+  /**
+   * Submits (runs) the topology.
+   *
+   * Usage: "RollingTopWords [topology-name] [local|remote]"
+   *
+   * By default, the topology is run locally under the name "slidingWindowCounts".
+   *
+   * Examples:
+   *
+   * ```
+   *
+   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar local
+   *
+   * # Runs in remote/cluster mode, with topology name "production-topology"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology remote
+   * ```
+   *
+   * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
+   *             whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    String topologyName = "slidingWindowCounts";
+    if (args.length >= 1) {
+      topologyName = args[0];
+    }
+    boolean runLocally = true;
+    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
+      runLocally = false;
+    }
+
+    LOG.info("Topology name: " + topologyName);
+    RollingTopWords rtw = new RollingTopWords(topologyName);
+    if (runLocally) {
+      LOG.info("Running in local mode");
+      rtw.runLocally();
+    }
+    else {
+      LOG.info("Running in remote (cluster) mode");
+      rtw.runRemotely();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
new file mode 100644
index 0000000..b153372
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.starter.bolt.SingleJoinBolt;
+
+public class SingleJoinExample {
+  public static void main(String[] args) {
+    FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+    FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout("gender", genderSpout);
+    builder.setSpout("age", ageSpout);
+    builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
+        .fieldsGrouping("age", new Fields("id"));
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    LocalCluster cluster = new LocalCluster();
+    cluster.submitTopology("join-example", conf, builder.createTopology());
+
+    for (int i = 0; i < 10; i++) {
+      String gender;
+      if (i % 2 == 0) {
+        gender = "male";
+      }
+      else {
+        gender = "female";
+      }
+      genderSpout.feed(new Values(i, gender));
+    }
+
+    for (int i = 9; i >= 0; i--) {
+      ageSpout.feed(new Values(i, i + 20));
+    }
+
+    Utils.sleep(2000);
+    cluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
new file mode 100644
index 0000000..3addc15
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
+import org.apache.storm.starter.bolt.RollingCountBolt;
+import org.apache.storm.starter.bolt.RollingCountAggBolt;
+import org.apache.storm.starter.bolt.TotalRankingsBolt;
+import org.apache.storm.starter.util.StormRunner;
+
+/**
+ * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
+ * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
+ * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much
+ * more common then other words, and uses partialKeyGrouping to better balance the skewed load.
+ */
+public class SkewedRollingTopWords {
+  private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class);
+  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+  private static final int TOP_N = 5;
+
+  private final TopologyBuilder builder;
+  private final String topologyName;
+  private final Config topologyConfig;
+  private final int runtimeInSeconds;
+
+  public SkewedRollingTopWords(String topologyName) throws InterruptedException {
+    builder = new TopologyBuilder();
+    this.topologyName = topologyName;
+    topologyConfig = createTopologyConfiguration();
+    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+    wireTopology();
+  }
+
+  private static Config createTopologyConfiguration() {
+    Config conf = new Config();
+    conf.setDebug(true);
+    return conf;
+  }
+
+  private void wireTopology() throws InterruptedException {
+    String spoutId = "wordGenerator";
+    String counterId = "counter";
+    String aggId = "aggregator";
+    String intermediateRankerId = "intermediateRanker";
+    String totalRankerId = "finalRanker";
+    builder.setSpout(spoutId, new TestWordSpout(), 5);
+    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));
+    builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
+    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
+    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+  }
+
+  public void runLocally() throws InterruptedException {
+    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+  }
+
+  public void runRemotely() throws Exception {
+    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+  }
+
+  /**
+   * Submits (runs) the topology.
+   *
+   * Usage: "RollingTopWords [topology-name] [local|remote]"
+   *
+   * By default, the topology is run locally under the name "slidingWindowCounts".
+   *
+   * Examples:
+   *
+   * ```
+   *
+   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar local
+   *
+   * # Runs in remote/cluster mode, with topology name "production-topology"
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology remote
+   * ```
+   *
+   * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
+   *             whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    String topologyName = "slidingWindowCounts";
+    if (args.length >= 1) {
+      topologyName = args[0];
+    }
+    boolean runLocally = true;
+    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
+      runLocally = false;
+    }
+
+    LOG.info("Topology name: " + topologyName);
+    SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName);
+    if (runLocally) {
+      LOG.info("Running in local mode");
+      rtw.runLocally();
+    }
+    else {
+      LOG.info("Running in remote (cluster) mode");
+      rtw.runRemotely();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
new file mode 100644
index 0000000..90744f2
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Windowing based on tuple timestamp (e.g. the time when tuple is generated
+ * rather than when its processed).
+ */
+public class SlidingTupleTsTopology {
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        BaseWindowedBolt bolt = new SlidingWindowSumBolt()
+                .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
+                .withTimestampField("ts")
+                .withLag(new Duration(5, TimeUnit.SECONDS));
+        builder.setSpout("integer", new RandomIntegerSpout(), 1);
+        builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        if (args != null && args.length > 0) {
+            conf.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", conf, builder.createTopology());
+            Utils.sleep(40000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
new file mode 100644
index 0000000..cedcec5
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SlidingWindowSumBolt;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * A sample topology that demonstrates the usage of {@link org.apache.storm.topology.IWindowedBolt}
+ * to calculate sliding window sum.
+ */
+public class SlidingWindowTopology {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
+
+    /*
+     * Computes tumbling window average
+     */
+    private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(TupleWindow inputWindow) {
+            int sum = 0;
+            List<Tuple> tuplesInWindow = inputWindow.get();
+            LOG.debug("Events in current window: " + tuplesInWindow.size());
+            if (tuplesInWindow.size() > 0) {
+                /*
+                * Since this is a tumbling window calculation,
+                * we use all the tuples in the window to compute the avg.
+                */
+                for (Tuple tuple : tuplesInWindow) {
+                    sum += (int) tuple.getValue(0);
+                }
+                collector.emit(new Values(sum / tuplesInWindow.size()));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("avg"));
+        }
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("integer", new RandomIntegerSpout(), 1);
+        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
+                .shuffleGrouping("integer");
+        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
+                .shuffleGrouping("slidingsum");
+        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
+        Config conf = new Config();
+        conf.setDebug(true);
+        if (args != null && args.length > 0) {
+            conf.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", conf, builder.createTopology());
+            Utils.sleep(40000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
new file mode 100644
index 0000000..8ee48c9
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.metric.HttpForwardingMetricsServer;
+import org.apache.storm.metric.HttpForwardingMetricsConsumer;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
+import org.apache.storm.generated.*;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.StormSubmitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.HdrHistogram.Histogram;
+
+/**
+ * WordCount but the spout goes at a predefined rate and we collect
+ * proper latency statistics.
+ */
+public class ThroughputVsLatency {
+  private static class SentWithTime {
+    public final String sentence;
+    public final long time;
+
+    SentWithTime(String sentence, long time) {
+        this.sentence = sentence;
+        this.time = time;
+    }
+  }
+
+  public static class C {
+    LocalCluster _local = null;
+    Nimbus.Client _client = null;
+
+    public C(Map conf) {
+      Map clusterConf = Utils.readStormConfig();
+      if (conf != null) {
+        clusterConf.putAll(conf);
+      }
+      Boolean isLocal = (Boolean)clusterConf.get("run.local");
+      if (isLocal != null && isLocal) {
+        _local = new LocalCluster();
+      } else {
+        _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+      }
+    }
+
+    public ClusterSummary getClusterInfo() throws Exception {
+      if (_local != null) {
+        return _local.getClusterInfo();
+      } else {
+        return _client.getClusterInfo();
+      }
+    }
+
+    public TopologyInfo getTopologyInfo(String id) throws Exception {
+      if (_local != null) {
+        return _local.getTopologyInfo(id);
+      } else {
+        return _client.getTopologyInfo(id);
+      }
+    }
+
+    public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
+      if (_local != null) {
+        _local.killTopologyWithOpts(name, opts);
+      } else {
+        _client.killTopologyWithOpts(name, opts);
+      }
+    }
+
+    public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
+      if (_local != null) {
+        _local.submitTopology(name, stormConf, topology);
+      } else {
+        StormSubmitter.submitTopology(name, stormConf, topology);
+      }
+    }
+
+    public boolean isLocal() {
+      return _local != null;
+    }
+  }
+
+  public static class FastRandomSentenceSpout extends BaseRichSpout {
+    static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+          "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+    SpoutOutputCollector _collector;
+    long _periodNano;
+    long _emitAmount;
+    Random _rand;
+    long _nextEmitTime;
+    long _emitsLeft;
+    HistogramMetric _histo;
+
+    public FastRandomSentenceSpout(long ratePerSecond) {
+        if (ratePerSecond > 0) {
+            _periodNano = Math.max(1, 1000000000/ratePerSecond);
+            _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+        } else {
+            _periodNano = Long.MAX_VALUE - 1;
+            _emitAmount = 1;
+        }
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+      _collector = collector;
+      _rand = ThreadLocalRandom.current();
+      _nextEmitTime = System.nanoTime();
+      _emitsLeft = _emitAmount;
+      _histo = new HistogramMetric(3600000000000L, 3);
+      context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+    }
+
+    @Override
+    public void nextTuple() {
+      if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+          _emitsLeft = _emitAmount;
+          _nextEmitTime = _nextEmitTime + _periodNano;
+      }
+
+      if (_emitsLeft > 0) {
+          String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+          _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+          _emitsLeft--;
+      }
+    }
+
+    @Override
+    public void ack(Object id) {
+      long end = System.nanoTime();
+      SentWithTime st = (SentWithTime)id;
+      _histo.recordValue(end-st.time);
+    }
+
+    @Override
+    public void fail(Object id) {
+      SentWithTime st = (SentWithTime)id;
+      _collector.emit(new Values(st.sentence), id);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("sentence"));
+    }
+  }
+
+  public static class SplitSentence extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String sentence = tuple.getString(0);
+      for (String word: sentence.split("\\s+")) {
+          collector.emit(new Values(word, 1));
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
+    }
+  }
+
+  public static class WordCount extends BaseBasicBolt {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String word = tuple.getString(0);
+      Integer count = counts.get(word);
+      if (count == null)
+        count = 0;
+      count++;
+      counts.put(word, count);
+      collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
+    }
+  }
+
+  private static class MemMeasure {
+    private long _mem = 0;
+    private long _time = 0;
+
+    public synchronized void update(long mem) {
+        _mem = mem;
+        _time = System.currentTimeMillis();
+    }
+
+    public synchronized long get() {
+        return isExpired() ? 0l : _mem;
+    }
+
+    public synchronized boolean isExpired() {
+        return (System.currentTimeMillis() - _time) >= 20000;
+    }
+  }
+
+  private static final Histogram _histo = new Histogram(3600000000000L, 3);
+  private static final AtomicLong _systemCPU = new AtomicLong(0);
+  private static final AtomicLong _userCPU = new AtomicLong(0);
+  private static final AtomicLong _gcCount = new AtomicLong(0);
+  private static final AtomicLong _gcMs = new AtomicLong(0);
+  private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+  private static long readMemory() {
+    long total = 0;
+    for (MemMeasure mem: _memoryBytes.values()) {
+      total += mem.get();
+    }
+    return total;
+  }
+
+  private static long _prev_acked = 0;
+  private static long _prev_uptime = 0;
+
+  public static void printMetrics(C client, String name) throws Exception {
+    ClusterSummary summary = client.getClusterInfo();
+    String id = null;
+    for (TopologySummary ts: summary.get_topologies()) {
+      if (name.equals(ts.get_name())) {
+        id = ts.get_id();
+      }
+    }
+    if (id == null) {
+      throw new Exception("Could not find a topology named "+name);
+    }
+    TopologyInfo info = client.getTopologyInfo(id);
+    int uptime = info.get_uptime_secs();
+    long acked = 0;
+    long failed = 0;
+    for (ExecutorSummary exec: info.get_executors()) {
+      if ("spout".equals(exec.get_component_id())) {
+        SpoutStats stats = exec.get_stats().get_specific().get_spout();
+        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+        if (ackedMap != null) {
+          for (String key: ackedMap.keySet()) {
+            if (failedMap != null) {
+              Long tmp = failedMap.get(key);
+              if (tmp != null) {
+                  failed += tmp;
+              }
+            }
+            long ackVal = ackedMap.get(key);
+            acked += ackVal;
+          }
+        }
+      }
+    }
+    long ackedThisTime = acked - _prev_acked;
+    long thisTime = uptime - _prev_uptime;
+    long nnpct, nnnpct, min, max;
+    double mean, stddev;
+    synchronized(_histo) {
+      nnpct = _histo.getValueAtPercentile(99.0);
+      nnnpct = _histo.getValueAtPercentile(99.9);
+      min = _histo.getMinValue();
+      max = _histo.getMaxValue();
+      mean = _histo.getMean();
+      stddev = _histo.getStdDeviation();
+      _histo.reset();
+    }
+    long user = _userCPU.getAndSet(0);
+    long sys = _systemCPU.getAndSet(0);
+    long gc = _gcMs.getAndSet(0);
+    double memMB = readMemory() / (1024.0 * 1024.0);
+    System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+                      "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+                      "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
+                       uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+                       min, max, mean, stddev, user, sys, gc, memMB);
+    _prev_uptime = uptime;
+    _prev_acked = acked;
+  }
+
+  public static void kill(C client, String name) throws Exception {
+    KillOptions opts = new KillOptions();
+    opts.set_wait_secs(0);
+    client.killTopologyWithOpts(name, opts);
+  }
+
+  public static void main(String[] args) throws Exception {
+    long ratePerSecond = 500;
+    if (args != null && args.length > 0) {
+        ratePerSecond = Long.valueOf(args[0]);
+    }
+
+    int parallelism = 4;
+    if (args != null && args.length > 1) {
+        parallelism = Integer.valueOf(args[1]);
+    }
+
+    int numMins = 5;
+    if (args != null && args.length > 2) {
+        numMins = Integer.valueOf(args[2]);
+    }
+
+    String name = "wc-test";
+    if (args != null && args.length > 3) {
+        name = args[3];
+    }
+
+    Config conf = new Config();
+    HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+        @Override
+        public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+            String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+            for (DataPoint dp: dataPoints) {
+                if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+                    synchronized(_histo) {
+                        _histo.add((Histogram)dp.value);
+                    }
+                } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                   Object sys = m.get("sys-ms");
+                   if (sys instanceof Number) {
+                       _systemCPU.getAndAdd(((Number)sys).longValue());
+                   }
+                   Object user = m.get("user-ms");
+                   if (user instanceof Number) {
+                       _userCPU.getAndAdd(((Number)user).longValue());
+                   }
+                } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                   Object count = m.get("count");
+                   if (count instanceof Number) {
+                       _gcCount.getAndAdd(((Number)count).longValue());
+                   }
+                   Object time = m.get("timeMs");
+                   if (time instanceof Number) {
+                       _gcMs.getAndAdd(((Number)time).longValue());
+                   }
+                } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                   Object val = m.get("usedBytes");
+                   if (val instanceof Number) {
+                       MemMeasure mm = _memoryBytes.get(worker);
+                       if (mm == null) {
+                         mm = new MemMeasure();
+                         MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+                         mm = tmp == null ? mm : tmp; 
+                       }
+                       mm.update(((Number)val).longValue());
+                   }
+                }
+            }
+        }
+    };
+
+    metricServer.serve();
+    String url = metricServer.getUrl();
+
+    C cluster = new C(conf);
+    conf.setNumWorkers(parallelism);
+    conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+    conf.registerMetricsConsumer(org.apache.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
+    Map<String, String> workerMetrics = new HashMap<String, String>();
+    if (!cluster.isLocal()) {
+      //sigar uses JNI and does not work in local mode
+      workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+    }
+    conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+    conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+    conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+      "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
+
+    TopologyBuilder builder = new TopologyBuilder();
+
+    int numEach = 4 * parallelism;
+    builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+
+    builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+    builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+
+    try {
+        cluster.submitTopology(name, conf, builder.createTopology());
+
+        for (int i = 0; i < numMins * 2; i++) {
+            Thread.sleep(30 * 1000);
+            printMetrics(cluster, name);
+        }
+    } finally {
+        kill(cluster, name);
+    }
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
new file mode 100644
index 0000000..312f83e
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MemoryTransactionalSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBatchBolt;
+import org.apache.storm.topology.base.BaseTransactionalBolt;
+import org.apache.storm.transactional.ICommitter;
+import org.apache.storm.transactional.TransactionAttempt;
+import org.apache.storm.transactional.TransactionalTopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
+ * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes.
+ *
+ * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
+ */
+public class TransactionalGlobalCount {
+  public static final int PARTITION_TAKE_PER_BATCH = 3;
+  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+    put(0, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("chicken"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+    }});
+    put(1, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+      add(new Values("banana"));
+    }});
+    put(2, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+    }});
+  }};
+
+  public static class Value {
+    int count = 0;
+    BigInteger txid;
+  }
+
+  public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+  public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
+
+  public static class BatchCount extends BaseBatchBolt {
+    Object _id;
+    BatchOutputCollector _collector;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+      _collector = collector;
+      _id = id;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _count++;
+    }
+
+    @Override
+    public void finishBatch() {
+      _collector.emit(new Values(_id, _count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "count"));
+    }
+  }
+
+  public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
+    TransactionAttempt _attempt;
+    BatchOutputCollector _collector;
+
+    int _sum = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+      _collector = collector;
+      _attempt = attempt;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _sum += tuple.getInteger(1);
+    }
+
+    @Override
+    public void finishBatch() {
+      Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+      Value newval;
+      if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
+        newval = new Value();
+        newval.txid = _attempt.getTransactionId();
+        if (val == null) {
+          newval.count = _sum;
+        }
+        else {
+          newval.count = _sum + val.count;
+        }
+        DATABASE.put(GLOBAL_COUNT_KEY, newval);
+      }
+      else {
+        newval = val;
+      }
+      _collector.emit(new Values(_attempt, newval.count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "sum"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+    builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
+    builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
+
+    LocalCluster cluster = new LocalCluster();
+
+    Config config = new Config();
+    config.setDebug(true);
+    config.setMaxSpoutPending(3);
+
+    cluster.submitTopology("global-count-topology", config, builder.buildTopology());
+
+    Thread.sleep(3000);
+    cluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
new file mode 100644
index 0000000..64689b0
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MemoryTransactionalSpout;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseTransactionalBolt;
+import org.apache.storm.transactional.ICommitter;
+import org.apache.storm.transactional.TransactionAttempt;
+import org.apache.storm.transactional.TransactionalTopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
+ * stream of words and produces two outputs:
+ * <p/>
+ * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
+ * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
+ * <p/>
+ * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
+ * between buckets as their counts accumulate.
+ */
+public class TransactionalWords {
+  public static class CountValue {
+    Integer prev_count = null;
+    int count = 0;
+    BigInteger txid = null;
+  }
+
+  public static class BucketValue {
+    int count = 0;
+    BigInteger txid;
+  }
+
+  public static final int BUCKET_SIZE = 10;
+
+  public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
+  public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
+
+
+  public static final int PARTITION_TAKE_PER_BATCH = 3;
+
+  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+    put(0, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("chicken"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+    }});
+    put(1, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+      add(new Values("banana"));
+    }});
+    put(2, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+    }});
+  }};
+
+  public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
+    Map<String, Integer> _counts = new HashMap<String, Integer>();
+    BatchOutputCollector _collector;
+    TransactionAttempt _id;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
+      _collector = collector;
+      _id = id;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      String key = tuple.getString(1);
+      Integer curr = _counts.get(key);
+      if (curr == null)
+        curr = 0;
+      _counts.put(key, curr + 1);
+    }
+
+    @Override
+    public void finishBatch() {
+      for (String key : _counts.keySet()) {
+        CountValue val = COUNT_DATABASE.get(key);
+        CountValue newVal;
+        if (val == null || !val.txid.equals(_id)) {
+          newVal = new CountValue();
+          newVal.txid = _id.getTransactionId();
+          if (val != null) {
+            newVal.prev_count = val.count;
+            newVal.count = val.count;
+          }
+          newVal.count = newVal.count + _counts.get(key);
+          COUNT_DATABASE.put(key, newVal);
+        }
+        else {
+          newVal = val;
+        }
+        _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "key", "count", "prev-count"));
+    }
+  }
+
+  public static class Bucketize extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
+      int curr = tuple.getInteger(2);
+      Integer prev = tuple.getInteger(3);
+
+      int currBucket = curr / BUCKET_SIZE;
+      Integer prevBucket = null;
+      if (prev != null) {
+        prevBucket = prev / BUCKET_SIZE;
+      }
+
+      if (prevBucket == null) {
+        collector.emit(new Values(attempt, currBucket, 1));
+      }
+      else if (currBucket != prevBucket) {
+        collector.emit(new Values(attempt, currBucket, 1));
+        collector.emit(new Values(attempt, prevBucket, -1));
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("attempt", "bucket", "delta"));
+    }
+  }
+
+  public static class BucketCountUpdater extends BaseTransactionalBolt {
+    Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
+    BatchOutputCollector _collector;
+    TransactionAttempt _attempt;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+      _collector = collector;
+      _attempt = attempt;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      Integer bucket = tuple.getInteger(1);
+      Integer delta = tuple.getInteger(2);
+      Integer curr = _accum.get(bucket);
+      if (curr == null)
+        curr = 0;
+      _accum.put(bucket, curr + delta);
+    }
+
+    @Override
+    public void finishBatch() {
+      for (Integer bucket : _accum.keySet()) {
+        BucketValue currVal = BUCKET_DATABASE.get(bucket);
+        BucketValue newVal;
+        if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
+          newVal = new BucketValue();
+          newVal.txid = _attempt.getTransactionId();
+          newVal.count = _accum.get(bucket);
+          if (currVal != null)
+            newVal.count += currVal.count;
+          BUCKET_DATABASE.put(bucket, newVal);
+        }
+        else {
+          newVal = currVal;
+        }
+        _collector.emit(new Values(_attempt, bucket, newVal.count));
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "bucket", "count"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+    builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
+    builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+    builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
+
+
+    LocalCluster cluster = new LocalCluster();
+
+    Config config = new Config();
+    config.setDebug(true);
+    config.setMaxSpoutPending(3);
+
+    cluster.submitTopology("top-n-topology", config, builder.buildTopology());
+
+    Thread.sleep(3000);
+    cluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
new file mode 100644
index 0000000..e4a5711
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.starter.spout.RandomSentenceSpout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ */
+public class WordCountTopology {
+  public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+    public SplitSentence() {
+      super("python", "splitsentence.py");
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+
+  public static class WordCount extends BaseBasicBolt {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String word = tuple.getString(0);
+      Integer count = counts.get(word);
+      if (count == null)
+        count = 0;
+      count++;
+      counts.put(word, count);
+      collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    }
+    else {
+      conf.setMaxTaskParallelism(3);
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("word-count", conf, builder.createTopology());
+
+      Thread.sleep(10000);
+
+      cluster.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
new file mode 100644
index 0000000..431b9d8
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.*;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ */
+public class WordCountTopologyNode {
+  public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+    public SplitSentence() {
+      super("node", "splitsentence.js");
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+
+    public static class RandomSentence extends ShellSpout implements IRichSpout {
+
+        public RandomSentence() {
+            super("node", "randomsentence.js");
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
+    }
+
+  public static class WordCount extends BaseBasicBolt {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String word = tuple.getString(0);
+      Integer count = counts.get(word);
+      if (count == null)
+        count = 0;
+      count++;
+      counts.put(word, count);
+      collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.setSpout("spout", new RandomSentence(), 5);
+
+    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    }
+    else {
+      conf.setMaxTaskParallelism(3);
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("word-count", conf, builder.createTopology());
+
+      Thread.sleep(10000);
+
+      cluster.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
new file mode 100644
index 0000000..9cf9e79
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.Rankings;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
+ * <p/>
+ * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
+ * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
+ * tuples are retrieved and counted.
+ */
+public abstract class AbstractRankerBolt extends BaseBasicBolt {
+
+  private static final long serialVersionUID = 4931640198501530202L;
+  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
+  private static final int DEFAULT_COUNT = 10;
+
+  private final int emitFrequencyInSeconds;
+  private final int count;
+  private final Rankings rankings;
+
+  public AbstractRankerBolt() {
+    this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  }
+
+  public AbstractRankerBolt(int topN) {
+    this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  }
+
+  public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
+    if (topN < 1) {
+      throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
+    }
+    if (emitFrequencyInSeconds < 1) {
+      throw new IllegalArgumentException(
+          "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
+    }
+    count = topN;
+    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+    rankings = new Rankings(count);
+  }
+
+  protected Rankings getRankings() {
+    return rankings;
+  }
+
+  /**
+   * This method functions as a template method (design pattern).
+   */
+  @Override
+  public final void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      getLogger().debug("Received tick tuple, triggering emit of current rankings");
+      emitRankings(collector);
+    }
+    else {
+      updateRankingsWithTuple(tuple);
+    }
+  }
+
+  abstract void updateRankingsWithTuple(Tuple tuple);
+
+  private void emitRankings(BasicOutputCollector collector) {
+    collector.emit(new Values(rankings.copy()));
+    getLogger().debug("Rankings: " + rankings);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("rankings"));
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    Map<String, Object> conf = new HashMap<String, Object>();
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+    return conf;
+  }
+
+  abstract Logger getLogger();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
new file mode 100644
index 0000000..6950bfb
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.Rankable;
+import org.apache.storm.starter.tools.RankableObjectWithFields;
+
+/**
+ * This bolt ranks incoming objects by their count.
+ * <p/>
+ * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
+ * additionalField2, ..., additionalFieldN).
+ */
+public final class IntermediateRankingsBolt extends AbstractRankerBolt {
+
+  private static final long serialVersionUID = -1369800530256637409L;
+  private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
+
+  public IntermediateRankingsBolt() {
+    super();
+  }
+
+  public IntermediateRankingsBolt(int topN) {
+    super(topN);
+  }
+
+  public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
+    super(topN, emitFrequencyInSeconds);
+  }
+
+  @Override
+  void updateRankingsWithTuple(Tuple tuple) {
+    Rankable rankable = RankableObjectWithFields.from(tuple);
+    super.getRankings().updateWith(rankable);
+  }
+
+  @Override
+  Logger getLogger() {
+    return LOG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
new file mode 100644
index 0000000..993a937
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+
+public class PrinterBolt extends BaseBasicBolt {
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    System.out.println(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer ofd) {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java
new file mode 100644
index 0000000..45300de
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import org.apache.storm.starter.tools.NthLastModifiedTimeTracker;
+import org.apache.storm.starter.tools.SlidingWindowCounter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This bolt aggregates counts from multiple upstream bolts.
+ */
+public class RollingCountAggBolt extends BaseRichBolt {
+  private static final long serialVersionUID = 5537727428628598519L;
+  private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class);
+  //Mapping of key->upstreamBolt->count
+  private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>();
+  private OutputCollector collector;
+
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    Object obj = tuple.getValue(0);
+    long count = tuple.getLong(1);
+    int source = tuple.getSourceTask();
+    Map<Integer, Long> subCounts = counts.get(obj);
+    if (subCounts == null) {
+      subCounts = new HashMap<Integer, Long>();
+      counts.put(obj, subCounts);
+    }
+    //Update the current count for this object
+    subCounts.put(source, count);
+    //Output the sum of all the known counts so for this key
+    long sum = 0;
+    for (Long val: subCounts.values()) {
+      sum += val;
+    }
+    collector.emit(new Values(obj, sum));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("obj", "count"));
+  }
+}