You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:42 UTC

[46/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
deleted file mode 100644
index 4c6680e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.metric.HttpForwardingMetricsServer;
-import backtype.storm.metric.HttpForwardingMetricsConsumer;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
-import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
-import backtype.storm.generated.*;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import backtype.storm.StormSubmitter;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.HdrHistogram.Histogram;
-
-/**
- * WordCount but the spout goes at a predefined rate and we collect
- * proper latency statistics.
- */
-public class ThroughputVsLatency {
-  private static class SentWithTime {
-    public final String sentence;
-    public final long time;
-
-    SentWithTime(String sentence, long time) {
-        this.sentence = sentence;
-        this.time = time;
-    }
-  }
-
-  public static class C {
-    LocalCluster _local = null;
-    Nimbus.Client _client = null;
-
-    public C(Map conf) {
-      Map clusterConf = Utils.readStormConfig();
-      if (conf != null) {
-        clusterConf.putAll(conf);
-      }
-      Boolean isLocal = (Boolean)clusterConf.get("run.local");
-      if (isLocal != null && isLocal) {
-        _local = new LocalCluster();
-      } else {
-        _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-      }
-    }
-
-    public ClusterSummary getClusterInfo() throws Exception {
-      if (_local != null) {
-        return _local.getClusterInfo();
-      } else {
-        return _client.getClusterInfo();
-      }
-    }
-
-    public TopologyInfo getTopologyInfo(String id) throws Exception {
-      if (_local != null) {
-        return _local.getTopologyInfo(id);
-      } else {
-        return _client.getTopologyInfo(id);
-      }
-    }
-
-    public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
-      if (_local != null) {
-        _local.killTopologyWithOpts(name, opts);
-      } else {
-        _client.killTopologyWithOpts(name, opts);
-      }
-    }
-
-    public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
-      if (_local != null) {
-        _local.submitTopology(name, stormConf, topology);
-      } else {
-        StormSubmitter.submitTopology(name, stormConf, topology);
-      }
-    }
-
-    public boolean isLocal() {
-      return _local != null;
-    }
-  }
-
-  public static class FastRandomSentenceSpout extends BaseRichSpout {
-    static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
-          "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
-    SpoutOutputCollector _collector;
-    long _periodNano;
-    long _emitAmount;
-    Random _rand;
-    long _nextEmitTime;
-    long _emitsLeft;
-    HistogramMetric _histo;
-
-    public FastRandomSentenceSpout(long ratePerSecond) {
-        if (ratePerSecond > 0) {
-            _periodNano = Math.max(1, 1000000000/ratePerSecond);
-            _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
-        } else {
-            _periodNano = Long.MAX_VALUE - 1;
-            _emitAmount = 1;
-        }
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-      _collector = collector;
-      _rand = ThreadLocalRandom.current();
-      _nextEmitTime = System.nanoTime();
-      _emitsLeft = _emitAmount;
-      _histo = new HistogramMetric(3600000000000L, 3);
-      context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
-    }
-
-    @Override
-    public void nextTuple() {
-      if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
-          _emitsLeft = _emitAmount;
-          _nextEmitTime = _nextEmitTime + _periodNano;
-      }
-
-      if (_emitsLeft > 0) {
-          String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
-          _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
-          _emitsLeft--;
-      }
-    }
-
-    @Override
-    public void ack(Object id) {
-      long end = System.nanoTime();
-      SentWithTime st = (SentWithTime)id;
-      _histo.recordValue(end-st.time);
-    }
-
-    @Override
-    public void fail(Object id) {
-      SentWithTime st = (SentWithTime)id;
-      _collector.emit(new Values(st.sentence), id);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("sentence"));
-    }
-  }
-
-  public static class SplitSentence extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String sentence = tuple.getString(0);
-      for (String word: sentence.split("\\s+")) {
-          collector.emit(new Values(word, 1));
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  private static class MemMeasure {
-    private long _mem = 0;
-    private long _time = 0;
-
-    public synchronized void update(long mem) {
-        _mem = mem;
-        _time = System.currentTimeMillis();
-    }
-
-    public synchronized long get() {
-        return isExpired() ? 0l : _mem;
-    }
-
-    public synchronized boolean isExpired() {
-        return (System.currentTimeMillis() - _time) >= 20000;
-    }
-  }
-
-  private static final Histogram _histo = new Histogram(3600000000000L, 3);
-  private static final AtomicLong _systemCPU = new AtomicLong(0);
-  private static final AtomicLong _userCPU = new AtomicLong(0);
-  private static final AtomicLong _gcCount = new AtomicLong(0);
-  private static final AtomicLong _gcMs = new AtomicLong(0);
-  private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
-  private static long readMemory() {
-    long total = 0;
-    for (MemMeasure mem: _memoryBytes.values()) {
-      total += mem.get();
-    }
-    return total;
-  }
-
-  private static long _prev_acked = 0;
-  private static long _prev_uptime = 0;
-
-  public static void printMetrics(C client, String name) throws Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
-    }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
-    }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id())) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        if (ackedMap != null) {
-          for (String key: ackedMap.keySet()) {
-            if (failedMap != null) {
-              Long tmp = failedMap.get(key);
-              if (tmp != null) {
-                  failed += tmp;
-              }
-            }
-            long ackVal = ackedMap.get(key);
-            acked += ackVal;
-          }
-        }
-      }
-    }
-    long ackedThisTime = acked - _prev_acked;
-    long thisTime = uptime - _prev_uptime;
-    long nnpct, nnnpct, min, max;
-    double mean, stddev;
-    synchronized(_histo) {
-      nnpct = _histo.getValueAtPercentile(99.0);
-      nnnpct = _histo.getValueAtPercentile(99.9);
-      min = _histo.getMinValue();
-      max = _histo.getMaxValue();
-      mean = _histo.getMean();
-      stddev = _histo.getStdDeviation();
-      _histo.reset();
-    }
-    long user = _userCPU.getAndSet(0);
-    long sys = _systemCPU.getAndSet(0);
-    long gc = _gcMs.getAndSet(0);
-    double memMB = readMemory() / (1024.0 * 1024.0);
-    System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
-                      "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
-                      "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
-                       uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
-                       min, max, mean, stddev, user, sys, gc, memMB);
-    _prev_uptime = uptime;
-    _prev_acked = acked;
-  }
-
-  public static void kill(C client, String name) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(name, opts);
-  }
-
-  public static void main(String[] args) throws Exception {
-    long ratePerSecond = 500;
-    if (args != null && args.length > 0) {
-        ratePerSecond = Long.valueOf(args[0]);
-    }
-
-    int parallelism = 4;
-    if (args != null && args.length > 1) {
-        parallelism = Integer.valueOf(args[1]);
-    }
-
-    int numMins = 5;
-    if (args != null && args.length > 2) {
-        numMins = Integer.valueOf(args[2]);
-    }
-
-    String name = "wc-test";
-    if (args != null && args.length > 3) {
-        name = args[3];
-    }
-
-    Config conf = new Config();
-    HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
-        @Override
-        public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-            String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
-            for (DataPoint dp: dataPoints) {
-                if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
-                    synchronized(_histo) {
-                        _histo.add((Histogram)dp.value);
-                    }
-                } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
-                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                   Object sys = m.get("sys-ms");
-                   if (sys instanceof Number) {
-                       _systemCPU.getAndAdd(((Number)sys).longValue());
-                   }
-                   Object user = m.get("user-ms");
-                   if (user instanceof Number) {
-                       _userCPU.getAndAdd(((Number)user).longValue());
-                   }
-                } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
-                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                   Object count = m.get("count");
-                   if (count instanceof Number) {
-                       _gcCount.getAndAdd(((Number)count).longValue());
-                   }
-                   Object time = m.get("timeMs");
-                   if (time instanceof Number) {
-                       _gcMs.getAndAdd(((Number)time).longValue());
-                   }
-                } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
-                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                   Object val = m.get("usedBytes");
-                   if (val instanceof Number) {
-                       MemMeasure mm = _memoryBytes.get(worker);
-                       if (mm == null) {
-                         mm = new MemMeasure();
-                         MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
-                         mm = tmp == null ? mm : tmp; 
-                       }
-                       mm.update(((Number)val).longValue());
-                   }
-                }
-            }
-        }
-    };
-
-    metricServer.serve();
-    String url = metricServer.getUrl();
-
-    C cluster = new C(conf);
-    conf.setNumWorkers(parallelism);
-    conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
-    conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
-    Map<String, String> workerMetrics = new HashMap<String, String>();
-    if (!cluster.isLocal()) {
-      //sigar uses JNI and does not work in local mode
-      workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
-    }
-    conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
-    conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
-    conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
-      "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
-    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
-
-    TopologyBuilder builder = new TopologyBuilder();
-
-    int numEach = 4 * parallelism;
-    builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
-
-    builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
-
-    try {
-        cluster.submitTopology(name, conf, builder.createTopology());
-
-        for (int i = 0; i < numMins * 2; i++) {
-            Thread.sleep(30 * 1000);
-            printMetrics(cluster, name);
-        }
-    } finally {
-        kill(cluster, name);
-    }
-    System.exit(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
deleted file mode 100644
index 706afd1..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.MemoryTransactionalSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.topology.base.BaseTransactionalBolt;
-import backtype.storm.transactional.ICommitter;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.transactional.TransactionalTopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
- * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes.
- *
- * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
- */
-public class TransactionalGlobalCount {
-  public static final int PARTITION_TAKE_PER_BATCH = 3;
-  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-    put(0, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("chicken"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-    }});
-    put(1, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-      add(new Values("banana"));
-    }});
-    put(2, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-    }});
-  }};
-
-  public static class Value {
-    int count = 0;
-    BigInteger txid;
-  }
-
-  public static Map<String, Value> DATABASE = new HashMap<String, Value>();
-  public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-
-  public static class BatchCount extends BaseBatchBolt {
-    Object _id;
-    BatchOutputCollector _collector;
-
-    int _count = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-      _collector = collector;
-      _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _count++;
-    }
-
-    @Override
-    public void finishBatch() {
-      _collector.emit(new Values(_id, _count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "count"));
-    }
-  }
-
-  public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
-    TransactionAttempt _attempt;
-    BatchOutputCollector _collector;
-
-    int _sum = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-      _collector = collector;
-      _attempt = attempt;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _sum += tuple.getInteger(1);
-    }
-
-    @Override
-    public void finishBatch() {
-      Value val = DATABASE.get(GLOBAL_COUNT_KEY);
-      Value newval;
-      if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
-        newval = new Value();
-        newval.txid = _attempt.getTransactionId();
-        if (val == null) {
-          newval.count = _sum;
-        }
-        else {
-          newval.count = _sum + val.count;
-        }
-        DATABASE.put(GLOBAL_COUNT_KEY, newval);
-      }
-      else {
-        newval = val;
-      }
-      _collector.emit(new Values(_attempt, newval.count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "sum"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
-    builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
-    builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
-
-    LocalCluster cluster = new LocalCluster();
-
-    Config config = new Config();
-    config.setDebug(true);
-    config.setMaxSpoutPending(3);
-
-    cluster.submitTopology("global-count-topology", config, builder.buildTopology());
-
-    Thread.sleep(3000);
-    cluster.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
deleted file mode 100644
index 4d5ba1b..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.MemoryTransactionalSpout;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseTransactionalBolt;
-import backtype.storm.transactional.ICommitter;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.transactional.TransactionalTopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
- * stream of words and produces two outputs:
- * <p/>
- * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
- * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
- * <p/>
- * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
- * between buckets as their counts accumulate.
- */
-public class TransactionalWords {
-  public static class CountValue {
-    Integer prev_count = null;
-    int count = 0;
-    BigInteger txid = null;
-  }
-
-  public static class BucketValue {
-    int count = 0;
-    BigInteger txid;
-  }
-
-  public static final int BUCKET_SIZE = 10;
-
-  public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
-  public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-
-
-  public static final int PARTITION_TAKE_PER_BATCH = 3;
-
-  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-    put(0, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("chicken"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-    }});
-    put(1, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("apple"));
-      add(new Values("banana"));
-    }});
-    put(2, new ArrayList<List<Object>>() {{
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("cat"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-      add(new Values("dog"));
-    }});
-  }};
-
-  public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
-    Map<String, Integer> _counts = new HashMap<String, Integer>();
-    BatchOutputCollector _collector;
-    TransactionAttempt _id;
-
-    int _count = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
-      _collector = collector;
-      _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      String key = tuple.getString(1);
-      Integer curr = _counts.get(key);
-      if (curr == null)
-        curr = 0;
-      _counts.put(key, curr + 1);
-    }
-
-    @Override
-    public void finishBatch() {
-      for (String key : _counts.keySet()) {
-        CountValue val = COUNT_DATABASE.get(key);
-        CountValue newVal;
-        if (val == null || !val.txid.equals(_id)) {
-          newVal = new CountValue();
-          newVal.txid = _id.getTransactionId();
-          if (val != null) {
-            newVal.prev_count = val.count;
-            newVal.count = val.count;
-          }
-          newVal.count = newVal.count + _counts.get(key);
-          COUNT_DATABASE.put(key, newVal);
-        }
-        else {
-          newVal = val;
-        }
-        _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "key", "count", "prev-count"));
-    }
-  }
-
-  public static class Bucketize extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
-      int curr = tuple.getInteger(2);
-      Integer prev = tuple.getInteger(3);
-
-      int currBucket = curr / BUCKET_SIZE;
-      Integer prevBucket = null;
-      if (prev != null) {
-        prevBucket = prev / BUCKET_SIZE;
-      }
-
-      if (prevBucket == null) {
-        collector.emit(new Values(attempt, currBucket, 1));
-      }
-      else if (currBucket != prevBucket) {
-        collector.emit(new Values(attempt, currBucket, 1));
-        collector.emit(new Values(attempt, prevBucket, -1));
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("attempt", "bucket", "delta"));
-    }
-  }
-
-  public static class BucketCountUpdater extends BaseTransactionalBolt {
-    Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
-    BatchOutputCollector _collector;
-    TransactionAttempt _attempt;
-
-    int _count = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-      _collector = collector;
-      _attempt = attempt;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      Integer bucket = tuple.getInteger(1);
-      Integer delta = tuple.getInteger(2);
-      Integer curr = _accum.get(bucket);
-      if (curr == null)
-        curr = 0;
-      _accum.put(bucket, curr + delta);
-    }
-
-    @Override
-    public void finishBatch() {
-      for (Integer bucket : _accum.keySet()) {
-        BucketValue currVal = BUCKET_DATABASE.get(bucket);
-        BucketValue newVal;
-        if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
-          newVal = new BucketValue();
-          newVal.txid = _attempt.getTransactionId();
-          newVal.count = _accum.get(bucket);
-          if (currVal != null)
-            newVal.count += currVal.count;
-          BUCKET_DATABASE.put(bucket, newVal);
-        }
-        else {
-          newVal = currVal;
-        }
-        _collector.emit(new Values(_attempt, bucket, newVal.count));
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "bucket", "count"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
-    builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
-    builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
-    builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
-
-
-    LocalCluster cluster = new LocalCluster();
-
-    Config config = new Config();
-    config.setDebug(true);
-    config.setMaxSpoutPending(3);
-
-    cluster.submitTopology("top-n-topology", config, builder.buildTopology());
-
-    Thread.sleep(3000);
-    cluster.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
deleted file mode 100644
index 7260beb..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import storm.starter.spout.RandomSentenceSpout;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This topology demonstrates Storm's stream groupings and multilang capabilities.
- */
-public class WordCountTopology {
-  public static class SplitSentence extends ShellBolt implements IRichBolt {
-
-    public SplitSentence() {
-      super("python", "splitsentence.py");
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-      return null;
-    }
-  }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("spout", new RandomSentenceSpout(), 5);
-
-    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-      conf.setMaxTaskParallelism(3);
-
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("word-count", conf, builder.createTopology());
-
-      Thread.sleep(10000);
-
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java
deleted file mode 100644
index 3fe982f..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.spout.ShellSpout;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.*;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This topology demonstrates Storm's stream groupings and multilang capabilities.
- */
-public class WordCountTopologyNode {
-  public static class SplitSentence extends ShellBolt implements IRichBolt {
-
-    public SplitSentence() {
-      super("node", "splitsentence.js");
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-      return null;
-    }
-  }
-
-    public static class RandomSentence extends ShellSpout implements IRichSpout {
-
-        public RandomSentence() {
-            super("node", "randomsentence.js");
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-
-        @Override
-        public Map<String, Object> getComponentConfiguration() {
-            return null;
-        }
-    }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("spout", new RandomSentence(), 5);
-
-    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-      conf.setMaxTaskParallelism(3);
-
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("word-count", conf, builder.createTopology());
-
-      Thread.sleep(10000);
-
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
deleted file mode 100644
index 64ceb29..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.TupleUtils;
-import org.apache.log4j.Logger;
-import storm.starter.tools.Rankings;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
- * <p/>
- * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
- * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
- * tuples are retrieved and counted.
- */
-public abstract class AbstractRankerBolt extends BaseBasicBolt {
-
-  private static final long serialVersionUID = 4931640198501530202L;
-  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
-  private static final int DEFAULT_COUNT = 10;
-
-  private final int emitFrequencyInSeconds;
-  private final int count;
-  private final Rankings rankings;
-
-  public AbstractRankerBolt() {
-    this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-  }
-
-  public AbstractRankerBolt(int topN) {
-    this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-  }
-
-  public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
-    if (topN < 1) {
-      throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
-    }
-    if (emitFrequencyInSeconds < 1) {
-      throw new IllegalArgumentException(
-          "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
-    }
-    count = topN;
-    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
-    rankings = new Rankings(count);
-  }
-
-  protected Rankings getRankings() {
-    return rankings;
-  }
-
-  /**
-   * This method functions as a template method (design pattern).
-   */
-  @Override
-  public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      getLogger().debug("Received tick tuple, triggering emit of current rankings");
-      emitRankings(collector);
-    }
-    else {
-      updateRankingsWithTuple(tuple);
-    }
-  }
-
-  abstract void updateRankingsWithTuple(Tuple tuple);
-
-  private void emitRankings(BasicOutputCollector collector) {
-    collector.emit(new Values(rankings.copy()));
-    getLogger().debug("Rankings: " + rankings);
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("rankings"));
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    Map<String, Object> conf = new HashMap<String, Object>();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
-    return conf;
-  }
-
-  abstract Logger getLogger();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
deleted file mode 100644
index d1805ff..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.log4j.Logger;
-import storm.starter.tools.Rankable;
-import storm.starter.tools.RankableObjectWithFields;
-
-/**
- * This bolt ranks incoming objects by their count.
- * <p/>
- * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
- * additionalField2, ..., additionalFieldN).
- */
-public final class IntermediateRankingsBolt extends AbstractRankerBolt {
-
-  private static final long serialVersionUID = -1369800530256637409L;
-  private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
-
-  public IntermediateRankingsBolt() {
-    super();
-  }
-
-  public IntermediateRankingsBolt(int topN) {
-    super(topN);
-  }
-
-  public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
-    super(topN, emitFrequencyInSeconds);
-  }
-
-  @Override
-  void updateRankingsWithTuple(Tuple tuple) {
-    Rankable rankable = RankableObjectWithFields.from(tuple);
-    super.getRankings().updateWith(rankable);
-  }
-
-  @Override
-  Logger getLogger() {
-    return LOG;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
deleted file mode 100644
index 58fc8ca..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Tuple;
-
-
-public class PrinterBolt extends BaseBasicBolt {
-
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    System.out.println(tuple);
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer ofd) {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
deleted file mode 100644
index e222a97..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import org.apache.log4j.Logger;
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This bolt aggregates counts from multiple upstream bolts.
- */
-public class RollingCountAggBolt extends BaseRichBolt {
-  private static final long serialVersionUID = 5537727428628598519L;
-  private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class);
-  //Mapping of key->upstreamBolt->count
-  private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>();
-  private OutputCollector collector;
-
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
-  }
-
-  @Override
-  public void execute(Tuple tuple) {
-    Object obj = tuple.getValue(0);
-    long count = tuple.getLong(1);
-    int source = tuple.getSourceTask();
-    Map<Integer, Long> subCounts = counts.get(obj);
-    if (subCounts == null) {
-      subCounts = new HashMap<Integer, Long>();
-      counts.put(obj, subCounts);
-    }
-    //Update the current count for this object
-    subCounts.put(source, count);
-    //Output the sum of all the known counts so for this key
-    long sum = 0;
-    for (Long val: subCounts.values()) {
-      sum += val;
-    }
-    collector.emit(new Values(obj, sum));
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("obj", "count"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
deleted file mode 100644
index 31f7ee2..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.TupleUtils;
-import org.apache.log4j.Logger;
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.
- * <p/>
- * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output
- * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the
- * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five
- * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every
- * minute.
- * <p/>
- * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the
- * actual duration of the sliding window. The latter is included in case the expected sliding window length (as
- * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual
- * window length is tracked and calculated for the window, and not individually for each object within a window.
- * <p/>
- * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window
- * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window
- * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning
- * during the first ~ five minutes of startup time if the window length is set to five minutes).
- */
-public class RollingCountBolt extends BaseRichBolt {
-
-  private static final long serialVersionUID = 5537727428628598519L;
-  private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
-  private static final int NUM_WINDOW_CHUNKS = 5;
-  private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
-  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
-  private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
-      "Actual window length is %d seconds when it should be %d seconds"
-          + " (you can safely ignore this warning during the startup phase)";
-
-  private final SlidingWindowCounter<Object> counter;
-  private final int windowLengthInSeconds;
-  private final int emitFrequencyInSeconds;
-  private OutputCollector collector;
-  private NthLastModifiedTimeTracker lastModifiedTracker;
-
-  public RollingCountBolt() {
-    this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-  }
-
-  public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
-    this.windowLengthInSeconds = windowLengthInSeconds;
-    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
-    counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
-        this.emitFrequencyInSeconds));
-  }
-
-  private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
-    return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
-    lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
-        this.emitFrequencyInSeconds));
-  }
-
-  @Override
-  public void execute(Tuple tuple) {
-    if (TupleUtils.isTick(tuple)) {
-      LOG.debug("Received tick tuple, triggering emit of current window counts");
-      emitCurrentWindowCounts();
-    }
-    else {
-      countObjAndAck(tuple);
-    }
-  }
-
-  private void emitCurrentWindowCounts() {
-    Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
-    int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
-    lastModifiedTracker.markAsModified();
-    if (actualWindowLengthInSeconds != windowLengthInSeconds) {
-      LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
-    }
-    emit(counts, actualWindowLengthInSeconds);
-  }
-
-  private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
-    for (Entry<Object, Long> entry : counts.entrySet()) {
-      Object obj = entry.getKey();
-      Long count = entry.getValue();
-      collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
-    }
-  }
-
-  private void countObjAndAck(Tuple tuple) {
-    Object obj = tuple.getValue(0);
-    counter.incrementCount(obj);
-    collector.ack(tuple);
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    Map<String, Object> conf = new HashMap<String, Object>();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
-    return conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
deleted file mode 100644
index 85a7a26..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import java.util.*;
-
-public class SingleJoinBolt extends BaseRichBolt {
-  OutputCollector _collector;
-  Fields _idFields;
-  Fields _outFields;
-  int _numSources;
-  TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
-  Map<String, GlobalStreamId> _fieldLocations;
-
-  public SingleJoinBolt(Fields outFields) {
-    _outFields = outFields;
-  }
-
-  @Override
-  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-    _fieldLocations = new HashMap<String, GlobalStreamId>();
-    _collector = collector;
-    int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
-    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
-    _numSources = context.getThisSources().size();
-    Set<String> idFields = null;
-    for (GlobalStreamId source : context.getThisSources().keySet()) {
-      Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
-      Set<String> setFields = new HashSet<String>(fields.toList());
-      if (idFields == null)
-        idFields = setFields;
-      else
-        idFields.retainAll(setFields);
-
-      for (String outfield : _outFields) {
-        for (String sourcefield : fields) {
-          if (outfield.equals(sourcefield)) {
-            _fieldLocations.put(outfield, source);
-          }
-        }
-      }
-    }
-    _idFields = new Fields(new ArrayList<String>(idFields));
-
-    if (_fieldLocations.size() != _outFields.size()) {
-      throw new RuntimeException("Cannot find all outfields among sources");
-    }
-  }
-
-  @Override
-  public void execute(Tuple tuple) {
-    List<Object> id = tuple.select(_idFields);
-    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
-    if (!_pending.containsKey(id)) {
-      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
-    }
-    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
-    if (parts.containsKey(streamId))
-      throw new RuntimeException("Received same side of single join twice");
-    parts.put(streamId, tuple);
-    if (parts.size() == _numSources) {
-      _pending.remove(id);
-      List<Object> joinResult = new ArrayList<Object>();
-      for (String outField : _outFields) {
-        GlobalStreamId loc = _fieldLocations.get(outField);
-        joinResult.add(parts.get(loc).getValueByField(outField));
-      }
-      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-
-      for (Tuple part : parts.values()) {
-        _collector.ack(part);
-      }
-    }
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(_outFields);
-  }
-
-  private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
-    @Override
-    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
-      for (Tuple tuple : tuples.values()) {
-        _collector.fail(tuple);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
deleted file mode 100644
index ef3a0b8..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseWindowedBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.windowing.TupleWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Computes sliding window sum
- */
-public class SlidingWindowSumBolt extends BaseWindowedBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
-
-    private int sum = 0;
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(TupleWindow inputWindow) {
-            /*
-             * The inputWindow gives a view of
-             * (a) all the events in the window
-             * (b) events that expired since last activation of the window
-             * (c) events that newly arrived since last activation of the window
-             */
-        List<Tuple> tuplesInWindow = inputWindow.get();
-        List<Tuple> newTuples = inputWindow.getNew();
-        List<Tuple> expiredTuples = inputWindow.getExpired();
-
-        LOG.debug("Events in current window: " + tuplesInWindow.size());
-            /*
-             * Instead of iterating over all the tuples in the window to compute
-             * the sum, the values for the new events are added and old events are
-             * subtracted. Similar optimizations might be possible in other
-             * windowing computations.
-             */
-        for (Tuple tuple : newTuples) {
-            sum += (int) tuple.getValue(0);
-        }
-        for (Tuple tuple : expiredTuples) {
-            sum -= (int) tuple.getValue(0);
-        }
-        collector.emit(new Values(sum));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("sum"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
deleted file mode 100644
index 0e1bb05..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.log4j.Logger;
-import storm.starter.tools.Rankings;
-
-/**
- * This bolt merges incoming {@link Rankings}.
- * <p/>
- * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,
- * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.
- */
-public final class TotalRankingsBolt extends AbstractRankerBolt {
-
-  private static final long serialVersionUID = -8447525895532302198L;
-  private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
-
-  public TotalRankingsBolt() {
-    super();
-  }
-
-  public TotalRankingsBolt(int topN) {
-    super(topN);
-  }
-
-  public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
-    super(topN, emitFrequencyInSeconds);
-  }
-
-  @Override
-  void updateRankingsWithTuple(Tuple tuple) {
-    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
-    super.getRankings().updateWith(rankingsToBeMerged);
-    super.getRankings().pruneZeroCounts();
-  }
-
-  @Override
-  Logger getLogger() {
-    return LOG;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
deleted file mode 100644
index 5778c8e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.spout;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Emits a random integer and a timestamp value (offset by one day),
- * every 100 ms. The ts field can be used in tuple time based windowing.
- */
-public class RandomIntegerSpout extends BaseRichSpout {
-    private SpoutOutputCollector collector;
-    private Random rand;
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("value", "ts"));
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.collector = collector;
-        this.rand = new Random();
-    }
-
-    @Override
-    public void nextTuple() {
-        Utils.sleep(100);
-        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
deleted file mode 100644
index 813b10c..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.spout;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.Random;
-
-public class RandomSentenceSpout extends BaseRichSpout {
-  SpoutOutputCollector _collector;
-  Random _rand;
-
-
-  @Override
-  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-    _collector = collector;
-    _rand = new Random();
-  }
-
-  @Override
-  public void nextTuple() {
-    Utils.sleep(100);
-    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
-        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-    String sentence = sentences[_rand.nextInt(sentences.length)];
-    _collector.emit(new Values(sentence));
-  }
-
-  @Override
-  public void ack(Object id) {
-  }
-
-  @Override
-  public void fail(Object id) {
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("word"));
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
deleted file mode 100644
index 40f8d72..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package storm.starter.spout;
-
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-import twitter4j.TwitterStreamFactory;
-import twitter4j.auth.AccessToken;
-import twitter4j.conf.ConfigurationBuilder;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-@SuppressWarnings("serial")
-public class TwitterSampleSpout extends BaseRichSpout {
-
-	SpoutOutputCollector _collector;
-	LinkedBlockingQueue<Status> queue = null;
-	TwitterStream _twitterStream;
-	String consumerKey;
-	String consumerSecret;
-	String accessToken;
-	String accessTokenSecret;
-	String[] keyWords;
-
-	public TwitterSampleSpout(String consumerKey, String consumerSecret,
-			String accessToken, String accessTokenSecret, String[] keyWords) {
-		this.consumerKey = consumerKey;
-		this.consumerSecret = consumerSecret;
-		this.accessToken = accessToken;
-		this.accessTokenSecret = accessTokenSecret;
-		this.keyWords = keyWords;
-	}
-
-	public TwitterSampleSpout() {
-		// TODO Auto-generated constructor stub
-	}
-
-	@Override
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		queue = new LinkedBlockingQueue<Status>(1000);
-		_collector = collector;
-
-		StatusListener listener = new StatusListener() {
-
-			@Override
-			public void onStatus(Status status) {
-			
-				queue.offer(status);
-			}
-
-			@Override
-			public void onDeletionNotice(StatusDeletionNotice sdn) {
-			}
-
-			@Override
-			public void onTrackLimitationNotice(int i) {
-			}
-
-			@Override
-			public void onScrubGeo(long l, long l1) {
-			}
-
-			@Override
-			public void onException(Exception ex) {
-			}
-
-			@Override
-			public void onStallWarning(StallWarning arg0) {
-				// TODO Auto-generated method stub
-
-			}
-
-		};
-
-		TwitterStream twitterStream = new TwitterStreamFactory(
-				new ConfigurationBuilder().setJSONStoreEnabled(true).build())
-				.getInstance();
-
-		twitterStream.addListener(listener);
-		twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
-		AccessToken token = new AccessToken(accessToken, accessTokenSecret);
-		twitterStream.setOAuthAccessToken(token);
-		
-		if (keyWords.length == 0) {
-
-			twitterStream.sample();
-		}
-
-		else {
-
-			FilterQuery query = new FilterQuery().track(keyWords);
-			twitterStream.filter(query);
-		}
-
-	}
-
-	@Override
-	public void nextTuple() {
-		Status ret = queue.poll();
-		if (ret == null) {
-			Utils.sleep(50);
-		} else {
-			_collector.emit(new Values(ret));
-
-		}
-	}
-
-	@Override
-	public void close() {
-		_twitterStream.shutdown();
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		Config ret = new Config();
-		ret.setMaxTaskParallelism(1);
-		return ret;
-	}
-
-	@Override
-	public void ack(Object id) {
-	}
-
-	@Override
-	public void fail(Object id) {
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("tweet"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java b/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
deleted file mode 100644
index 08df8cf..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import backtype.storm.utils.Time;
-import org.apache.commons.collections.buffer.CircularFifoBuffer;
-
-/**
- * This class tracks the time-since-last-modify of a "thing" in a rolling fashion.
- * <p/>
- * For example, create a 5-slot tracker to track the five most recent time-since-last-modify.
- * <p/>
- * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just
- * been modified.
- */
-public class NthLastModifiedTimeTracker {
-
-  private static final int MILLIS_IN_SEC = 1000;
-
-  private final CircularFifoBuffer lastModifiedTimesMillis;
-
-  public NthLastModifiedTimeTracker(int numTimesToTrack) {
-    if (numTimesToTrack < 1) {
-      throw new IllegalArgumentException(
-          "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
-    }
-    lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
-    initLastModifiedTimesMillis();
-  }
-
-  private void initLastModifiedTimesMillis() {
-    long nowCached = now();
-    for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
-      lastModifiedTimesMillis.add(Long.valueOf(nowCached));
-    }
-  }
-
-  private long now() {
-    return Time.currentTimeMillis();
-  }
-
-  public int secondsSinceOldestModification() {
-    long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
-    return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
-  }
-
-  public void markAsModified() {
-    updateLastModifiedTime();
-  }
-
-  private void updateLastModifiedTime() {
-    lastModifiedTimesMillis.add(now());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java b/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
deleted file mode 100644
index 85e3d1d..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-public interface Rankable extends Comparable<Rankable> {
-
-  Object getObject();
-
-  long getCount();
-
-  /**
-   * Note: We do not defensively copy the object wrapped by the Rankable.  It is passed as is.
-   *
-   * @return a defensive copy
-   */
-  Rankable copy();
-}