You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/03/20 22:23:04 UTC

[41/50] [abbrv] merge storm-starter into examples

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
index 0000000,0000000..e808f83
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
@@@ -1,0 -1,0 +1,47 @@@
++package storm.starter.spout;
++
++import backtype.storm.spout.SpoutOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichSpout;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import backtype.storm.utils.Utils;
++
++import java.util.Map;
++import java.util.Random;
++
++public class RandomSentenceSpout extends BaseRichSpout {
++  SpoutOutputCollector _collector;
++  Random _rand;
++
++
++  @Override
++  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
++    _collector = collector;
++    _rand = new Random();
++  }
++
++  @Override
++  public void nextTuple() {
++    Utils.sleep(100);
++    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
++        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
++    String sentence = sentences[_rand.nextInt(sentences.length)];
++    _collector.emit(new Values(sentence));
++  }
++
++  @Override
++  public void ack(Object id) {
++  }
++
++  @Override
++  public void fail(Object id) {
++  }
++
++  @Override
++  public void declareOutputFields(OutputFieldsDeclarer declarer) {
++    declarer.declare(new Fields("word"));
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
index 0000000,0000000..0a48cf2
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
@@@ -1,0 -1,0 +1,105 @@@
++/*
++
++package storm.starter.spout;
++
++import backtype.storm.Config;
++import twitter4j.conf.ConfigurationBuilder;
++import twitter4j.TwitterStream;
++import twitter4j.TwitterStreamFactory;
++import backtype.storm.spout.SpoutOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichSpout;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import backtype.storm.utils.Utils;
++import java.util.Map;
++import java.util.concurrent.LinkedBlockingQueue;
++import twitter4j.Status;
++import twitter4j.StatusDeletionNotice;
++import twitter4j.StatusListener;
++
++public class TwitterSampleSpout extends BaseRichSpout {
++    SpoutOutputCollector _collector;
++    LinkedBlockingQueue<Status> queue = null;
++    TwitterStream _twitterStream;
++    String _username;
++    String _pwd;
++    
++    
++    public TwitterSampleSpout(String username, String pwd) {
++        _username = username;
++        _pwd = pwd;
++    }
++    
++    @Override
++    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
++        queue = new LinkedBlockingQueue<Status>(1000);
++        _collector = collector;
++        StatusListener listener = new StatusListener() {
++
++            @Override
++            public void onStatus(Status status) {
++                queue.offer(status);
++            }
++
++            @Override
++            public void onDeletionNotice(StatusDeletionNotice sdn) {
++            }
++
++            @Override
++            public void onTrackLimitationNotice(int i) {
++            }
++
++            @Override
++            public void onScrubGeo(long l, long l1) {
++            }
++
++            @Override
++            public void onException(Exception e) {
++            }
++            
++        };
++        TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build());
++        _twitterStream = fact.getInstance();
++        _twitterStream.addListener(listener);
++        _twitterStream.sample();
++    }
++
++    @Override
++    public void nextTuple() {
++        Status ret = queue.poll();
++        if(ret==null) {
++            Utils.sleep(50);
++        } else {
++            _collector.emit(new Values(ret));
++        }
++    }
++
++    @Override
++    public void close() {
++        _twitterStream.shutdown();
++    }
++
++    @Override
++    public Map<String, Object> getComponentConfiguration() {
++        Config ret = new Config();
++        ret.setMaxTaskParallelism(1);
++        return ret;
++    }    
++
++    @Override
++    public void ack(Object id) {
++    }
++
++    @Override
++    public void fail(Object id) {
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++        declarer.declare(new Fields("tweet"));
++    }
++    
++}
++*/

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
index 0000000,0000000..92998a6
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
@@@ -1,0 -1,0 +1,53 @@@
++package storm.starter.tools;
++
++import backtype.storm.utils.Time;
++import org.apache.commons.collections.buffer.CircularFifoBuffer;
++
++/**
++ * This class tracks the time-since-last-modify of a "thing" in a rolling fashion.
++ * <p/>
++ * For example, create a 5-slot tracker to track the five most recent time-since-last-modify.
++ * <p/>
++ * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just
++ * been modified.
++ */
++public class NthLastModifiedTimeTracker {
++
++  private static final int MILLIS_IN_SEC = 1000;
++
++  private final CircularFifoBuffer lastModifiedTimesMillis;
++
++  public NthLastModifiedTimeTracker(int numTimesToTrack) {
++    if (numTimesToTrack < 1) {
++      throw new IllegalArgumentException(
++          "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
++    }
++    lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
++    initLastModifiedTimesMillis();
++  }
++
++  private void initLastModifiedTimesMillis() {
++    long nowCached = now();
++    for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
++      lastModifiedTimesMillis.add(Long.valueOf(nowCached));
++    }
++  }
++
++  private long now() {
++    return Time.currentTimeMillis();
++  }
++
++  public int secondsSinceOldestModification() {
++    long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
++    return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
++  }
++
++  public void markAsModified() {
++    updateLastModifiedTime();
++  }
++
++  private void updateLastModifiedTime() {
++    lastModifiedTimesMillis.add(now());
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
index 0000000,0000000..254d920
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
@@@ -1,0 -1,0 +1,15 @@@
++package storm.starter.tools;
++
++public interface Rankable extends Comparable<Rankable> {
++
++  Object getObject();
++
++  long getCount();
++
++  /**
++   * Note: We do not defensively copy the object wrapped by the Rankable.  It is passed as is.
++   *
++   * @return a defensive copy
++   */
++  Rankable copy();
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
index 0000000,0000000..3079434
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
@@@ -1,0 -1,0 +1,131 @@@
++package storm.starter.tools;
++
++import backtype.storm.tuple.Tuple;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++
++import java.io.Serializable;
++import java.util.List;
++
++/**
++ * This class wraps an objects and its associated count, including any additional data fields.
++ * <p/>
++ * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology.
++ */
++public class RankableObjectWithFields implements Rankable, Serializable {
++
++  private static final long serialVersionUID = -9102878650001058090L;
++  private static final String toStringSeparator = "|";
++
++  private final Object obj;
++  private final long count;
++  private final ImmutableList<Object> fields;
++
++  public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
++    if (obj == null) {
++      throw new IllegalArgumentException("The object must not be null");
++    }
++    if (count < 0) {
++      throw new IllegalArgumentException("The count must be >= 0");
++    }
++    this.obj = obj;
++    this.count = count;
++    fields = ImmutableList.copyOf(otherFields);
++
++  }
++
++  /**
++   * Construct a new instance based on the provided {@link Tuple}.
++   * <p/>
++   * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of
++   * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be
++   * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.
++   *
++   * @param tuple
++   *
++   * @return new instance based on the provided tuple
++   */
++  public static RankableObjectWithFields from(Tuple tuple) {
++    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
++    Object obj = otherFields.remove(0);
++    Long count = (Long) otherFields.remove(0);
++    return new RankableObjectWithFields(obj, count, otherFields.toArray());
++  }
++
++  public Object getObject() {
++    return obj;
++  }
++
++  public long getCount() {
++    return count;
++  }
++
++  /**
++   * @return an immutable list of any additional data fields of the object (may be empty but will never be null)
++   */
++  public List<Object> getFields() {
++    return fields;
++  }
++
++  @Override
++  public int compareTo(Rankable other) {
++    long delta = this.getCount() - other.getCount();
++    if (delta > 0) {
++      return 1;
++    }
++    else if (delta < 0) {
++      return -1;
++    }
++    else {
++      return 0;
++    }
++  }
++
++  @Override
++  public boolean equals(Object o) {
++    if (this == o) {
++      return true;
++    }
++    if (!(o instanceof RankableObjectWithFields)) {
++      return false;
++    }
++    RankableObjectWithFields other = (RankableObjectWithFields) o;
++    return obj.equals(other.obj) && count == other.count;
++  }
++
++  @Override
++  public int hashCode() {
++    int result = 17;
++    int countHash = (int) (count ^ (count >>> 32));
++    result = 31 * result + countHash;
++    result = 31 * result + obj.hashCode();
++    return result;
++  }
++
++  public String toString() {
++    StringBuffer buf = new StringBuffer();
++    buf.append("[");
++    buf.append(obj);
++    buf.append(toStringSeparator);
++    buf.append(count);
++    for (Object field : fields) {
++      buf.append(toStringSeparator);
++      buf.append(field);
++    }
++    buf.append("]");
++    return buf.toString();
++  }
++
++  /**
++   * Note: We do not defensively copy the wrapped object and any accompanying fields.  We do guarantee, however,
++   * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields.
++   *
++   * @return
++   */
++  @Override
++  public Rankable copy() {
++    List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
++    return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields);
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
index 0000000,0000000..5076bf6
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
@@@ -1,0 -1,0 +1,139 @@@
++package storm.starter.tools;
++
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++
++import java.io.Serializable;
++import java.util.Collections;
++import java.util.List;
++
++public class Rankings implements Serializable {
++
++  private static final long serialVersionUID = -1549827195410578903L;
++  private static final int DEFAULT_COUNT = 10;
++
++  private final int maxSize;
++  private final List<Rankable> rankedItems = Lists.newArrayList();
++
++  public Rankings() {
++    this(DEFAULT_COUNT);
++  }
++
++  public Rankings(int topN) {
++    if (topN < 1) {
++      throw new IllegalArgumentException("topN must be >= 1");
++    }
++    maxSize = topN;
++  }
++
++  /**
++   * Copy constructor.
++   * @param other
++   */
++  public Rankings(Rankings other) {
++    this(other.maxSize());
++    updateWith(other);
++  }
++
++  /**
++   * @return the maximum possible number (size) of ranked objects this instance can hold
++   */
++  public int maxSize() {
++    return maxSize;
++  }
++
++  /**
++   * @return the number (size) of ranked objects this instance is currently holding
++   */
++  public int size() {
++    return rankedItems.size();
++  }
++
++  /**
++   * The returned defensive copy is only "somewhat" defensive.  We do, for instance, return a defensive copy of the
++   * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too.  However, the
++   * contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within
++   * a Rankable will be defensively copied, too.
++   *
++   * @return a somewhat defensive copy of ranked items
++   */
++  public List<Rankable> getRankings() {
++    List<Rankable> copy = Lists.newLinkedList();
++    for (Rankable r: rankedItems) {
++      copy.add(r.copy());
++    }
++    return ImmutableList.copyOf(copy);
++  }
++
++  public void updateWith(Rankings other) {
++    for (Rankable r : other.getRankings()) {
++      updateWith(r);
++    }
++  }
++
++  public void updateWith(Rankable r) {
++    synchronized(rankedItems) {
++      addOrReplace(r);
++      rerank();
++      shrinkRankingsIfNeeded();
++    }
++  }
++
++  private void addOrReplace(Rankable r) {
++    Integer rank = findRankOf(r);
++    if (rank != null) {
++      rankedItems.set(rank, r);
++    }
++    else {
++      rankedItems.add(r);
++    }
++  }
++
++  private Integer findRankOf(Rankable r) {
++    Object tag = r.getObject();
++    for (int rank = 0; rank < rankedItems.size(); rank++) {
++      Object cur = rankedItems.get(rank).getObject();
++      if (cur.equals(tag)) {
++        return rank;
++      }
++    }
++    return null;
++  }
++
++  private void rerank() {
++    Collections.sort(rankedItems);
++    Collections.reverse(rankedItems);
++  }
++
++  private void shrinkRankingsIfNeeded() {
++    if (rankedItems.size() > maxSize) {
++      rankedItems.remove(maxSize);
++    }
++  }
++
++  /**
++   * Removes ranking entries that have a count of zero.
++   */
++  public void pruneZeroCounts() {
++    int i = 0;
++    while (i < rankedItems.size()) {
++      if (rankedItems.get(i).getCount() == 0) {
++        rankedItems.remove(i);
++      }
++      else {
++        i++;
++      }
++    }
++  }
++
++  public String toString() {
++    return rankedItems.toString();
++  }
++
++  /**
++   * Creates a (defensive) copy of itself.
++   */
++  public Rankings copy() {
++    return new Rankings(this);
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
index 0000000,0000000..38becdc
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
@@@ -1,0 -1,0 +1,102 @@@
++package storm.starter.tools;
++
++import java.io.Serializable;
++import java.util.Map;
++
++/**
++ * This class counts objects in a sliding window fashion.
++ * <p/>
++ * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment
++ * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access
++ * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the
++ * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads
++ * will go to. Also, by itself this class will not advance the head slot.
++ * <p/>
++ * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code>
++ * iterations, this sliding window counter will always return object counts that are equal or greater than in the
++ * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually,
++ * this is the desired behavior.
++ * <p/>
++ * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each:
++ * <p/>
++ * <pre>
++ * {@code
++ * Sliding window counts of an object X over time
++ *
++ * Minute (timeline):
++ * 1    2   3   4   5   6   7   8
++ *
++ * Observed counts per minute:
++ * 1    1   1   1   0   0   0   0
++ *
++ * Counts returned by counter:
++ * 1    2   3   4   4   3   2   1
++ * }
++ * </pre>
++ * <p/>
++ * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the
++ * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load
++ * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your
++ * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously
++ * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing
++ * counts.
++ * <p/>
++ * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes,
++ * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times
++ * in the past five minutes", implying that it can only account for the last two of those five minutes because the
++ * counter was not running before that time.
++ *
++ * @param <T> The type of those objects we want to count.
++ */
++public final class SlidingWindowCounter<T> implements Serializable {
++
++  private static final long serialVersionUID = -2645063988768785810L;
++
++  private SlotBasedCounter<T> objCounter;
++  private int headSlot;
++  private int tailSlot;
++  private int windowLengthInSlots;
++
++  public SlidingWindowCounter(int windowLengthInSlots) {
++    if (windowLengthInSlots < 2) {
++      throw new IllegalArgumentException(
++          "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")");
++    }
++    this.windowLengthInSlots = windowLengthInSlots;
++    this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
++
++    this.headSlot = 0;
++    this.tailSlot = slotAfter(headSlot);
++  }
++
++  public void incrementCount(T obj) {
++    objCounter.incrementCount(obj, headSlot);
++  }
++
++  /**
++   * Return the current (total) counts of all tracked objects, then advance the window.
++   * <p/>
++   * Whenever this method is called, we consider the counts of the current sliding window to be available to and
++   * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
++   * objects within the next "chunk" of the sliding window.
++   *
++   * @return The current (total) counts of all tracked objects.
++   */
++  public Map<T, Long> getCountsThenAdvanceWindow() {
++    Map<T, Long> counts = objCounter.getCounts();
++    objCounter.wipeZeros();
++    objCounter.wipeSlot(tailSlot);
++    advanceHead();
++    return counts;
++  }
++
++  private void advanceHead() {
++    headSlot = tailSlot;
++    tailSlot = slotAfter(tailSlot);
++  }
++
++  private int slotAfter(int slot) {
++    return (slot + 1) % windowLengthInSlots;
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
index 0000000,0000000..f45ef48
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
@@@ -1,0 -1,0 +1,101 @@@
++package storm.starter.tools;
++
++import java.io.Serializable;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++
++/**
++ * This class provides per-slot counts of the occurrences of objects.
++ * <p/>
++ * It can be used, for instance, as a building block for implementing sliding window counting of objects.
++ *
++ * @param <T> The type of those objects we want to count.
++ */
++public final class SlotBasedCounter<T> implements Serializable {
++
++  private static final long serialVersionUID = 4858185737378394432L;
++
++  private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
++  private final int numSlots;
++
++  public SlotBasedCounter(int numSlots) {
++    if (numSlots <= 0) {
++      throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")");
++    }
++    this.numSlots = numSlots;
++  }
++
++  public void incrementCount(T obj, int slot) {
++    long[] counts = objToCounts.get(obj);
++    if (counts == null) {
++      counts = new long[this.numSlots];
++      objToCounts.put(obj, counts);
++    }
++    counts[slot]++;
++  }
++
++  public long getCount(T obj, int slot) {
++    long[] counts = objToCounts.get(obj);
++    if (counts == null) {
++      return 0;
++    }
++    else {
++      return counts[slot];
++    }
++  }
++
++  public Map<T, Long> getCounts() {
++    Map<T, Long> result = new HashMap<T, Long>();
++    for (T obj : objToCounts.keySet()) {
++      result.put(obj, computeTotalCount(obj));
++    }
++    return result;
++  }
++
++  private long computeTotalCount(T obj) {
++    long[] curr = objToCounts.get(obj);
++    long total = 0;
++    for (long l : curr) {
++      total += l;
++    }
++    return total;
++  }
++
++  /**
++   * Reset the slot count of any tracked objects to zero for the given slot.
++   *
++   * @param slot
++   */
++  public void wipeSlot(int slot) {
++    for (T obj : objToCounts.keySet()) {
++      resetSlotCountToZero(obj, slot);
++    }
++  }
++
++  private void resetSlotCountToZero(T obj, int slot) {
++    long[] counts = objToCounts.get(obj);
++    counts[slot] = 0;
++  }
++
++  private boolean shouldBeRemovedFromCounter(T obj) {
++    return computeTotalCount(obj) == 0;
++  }
++
++  /**
++   * Remove any object from the counter whose total count is zero (to free up memory).
++   */
++  public void wipeZeros() {
++    Set<T> objToBeRemoved = new HashSet<T>();
++    for (T obj : objToCounts.keySet()) {
++      if (shouldBeRemovedFromCounter(obj)) {
++        objToBeRemoved.add(obj);
++      }
++    }
++    for (T obj : objToBeRemoved) {
++      objToCounts.remove(obj);
++    }
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
index 0000000,0000000..a940b40
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
@@@ -1,0 -1,0 +1,139 @@@
++package storm.starter.trident;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.task.IMetricsContext;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import storm.trident.TridentState;
++import storm.trident.TridentTopology;
++import storm.trident.operation.BaseFunction;
++import storm.trident.operation.CombinerAggregator;
++import storm.trident.operation.TridentCollector;
++import storm.trident.operation.builtin.MapGet;
++import storm.trident.operation.builtin.Sum;
++import storm.trident.state.ReadOnlyState;
++import storm.trident.state.State;
++import storm.trident.state.StateFactory;
++import storm.trident.state.map.ReadOnlyMapState;
++import storm.trident.tuple.TridentTuple;
++
++import java.util.*;
++
++public class TridentReach {
++  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
++    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
++    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
++    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
++  }};
++
++  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
++    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
++    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
++    put("tim", Arrays.asList("alex"));
++    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
++    put("adam", Arrays.asList("david", "carissa"));
++    put("mike", Arrays.asList("john", "bob"));
++    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
++  }};
++
++  public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
++    public static class Factory implements StateFactory {
++      Map _map;
++
++      public Factory(Map map) {
++        _map = map;
++      }
++
++      @Override
++      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
++        return new StaticSingleKeyMapState(_map);
++      }
++
++    }
++
++    Map _map;
++
++    public StaticSingleKeyMapState(Map map) {
++      _map = map;
++    }
++
++
++    @Override
++    public List<Object> multiGet(List<List<Object>> keys) {
++      List<Object> ret = new ArrayList();
++      for (List<Object> key : keys) {
++        Object singleKey = key.get(0);
++        ret.add(_map.get(singleKey));
++      }
++      return ret;
++    }
++
++  }
++
++  public static class One implements CombinerAggregator<Integer> {
++    @Override
++    public Integer init(TridentTuple tuple) {
++      return 1;
++    }
++
++    @Override
++    public Integer combine(Integer val1, Integer val2) {
++      return 1;
++    }
++
++    @Override
++    public Integer zero() {
++      return 1;
++    }
++  }
++
++  public static class ExpandList extends BaseFunction {
++
++    @Override
++    public void execute(TridentTuple tuple, TridentCollector collector) {
++      List l = (List) tuple.getValue(0);
++      if (l != null) {
++        for (Object o : l) {
++          collector.emit(new Values(o));
++        }
++      }
++    }
++
++  }
++
++  public static StormTopology buildTopology(LocalDRPC drpc) {
++    TridentTopology topology = new TridentTopology();
++    TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
++    TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
++
++
++    topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
++        "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
++        tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
++        new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
++        "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
++    return topology.build();
++  }
++
++  public static void main(String[] args) throws Exception {
++    LocalDRPC drpc = new LocalDRPC();
++
++    Config conf = new Config();
++    LocalCluster cluster = new LocalCluster();
++
++    cluster.submitTopology("reach", conf, buildTopology(drpc));
++
++    Thread.sleep(2000);
++
++    System.out.println("REACH: " + drpc.execute("reach", "aaa"));
++    System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
++    System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
++
++
++    cluster.shutdown();
++    drpc.shutdown();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
index 0000000,0000000..c56280c
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
@@@ -1,0 -1,0 +1,68 @@@
++package storm.starter.trident;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.StormSubmitter;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import storm.trident.TridentState;
++import storm.trident.TridentTopology;
++import storm.trident.operation.BaseFunction;
++import storm.trident.operation.TridentCollector;
++import storm.trident.operation.builtin.Count;
++import storm.trident.operation.builtin.FilterNull;
++import storm.trident.operation.builtin.MapGet;
++import storm.trident.operation.builtin.Sum;
++import storm.trident.testing.FixedBatchSpout;
++import storm.trident.testing.MemoryMapState;
++import storm.trident.tuple.TridentTuple;
++
++
++public class TridentWordCount {
++  public static class Split extends BaseFunction {
++    @Override
++    public void execute(TridentTuple tuple, TridentCollector collector) {
++      String sentence = tuple.getString(0);
++      for (String word : sentence.split(" ")) {
++        collector.emit(new Values(word));
++      }
++    }
++  }
++
++  public static StormTopology buildTopology(LocalDRPC drpc) {
++    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
++        new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
++        new Values("how many apples can you eat"), new Values("to be or not to be the person"));
++    spout.setCycle(true);
++
++    TridentTopology topology = new TridentTopology();
++    TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
++        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
++        new Count(), new Fields("count")).parallelismHint(16);
++
++    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
++        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
++        new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
++    return topology.build();
++  }
++
++  public static void main(String[] args) throws Exception {
++    Config conf = new Config();
++    conf.setMaxSpoutPending(20);
++    if (args.length == 0) {
++      LocalDRPC drpc = new LocalDRPC();
++      LocalCluster cluster = new LocalCluster();
++      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
++      for (int i = 0; i < 100; i++) {
++        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
++        Thread.sleep(1000);
++      }
++    }
++    else {
++      conf.setNumWorkers(3);
++      StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
index 0000000,0000000..30c4f50
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
@@@ -1,0 -1,0 +1,22 @@@
++package storm.starter.util;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.generated.StormTopology;
++
++public final class StormRunner {
++
++  private static final int MILLIS_IN_SEC = 1000;
++
++  private StormRunner() {
++  }
++
++  public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
++      throws InterruptedException {
++    LocalCluster cluster = new LocalCluster();
++    cluster.submitTopology(topologyName, conf, topology);
++    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
++    cluster.killTopology(topologyName);
++    cluster.shutdown();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
index 0000000,0000000..a46480e
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
@@@ -1,0 -1,0 +1,16 @@@
++package storm.starter.util;
++
++import backtype.storm.Constants;
++import backtype.storm.tuple.Tuple;
++
++public final class TupleHelpers {
++
++  private TupleHelpers() {
++  }
++
++  public static boolean isTickTuple(Tuple tuple) {
++    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
++        Constants.SYSTEM_TICK_STREAM_ID);
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/storm-starter.iml
----------------------------------------------------------------------
diff --cc examples/storm-starter/storm-starter.iml
index 0000000,0000000..a4e5116
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/storm-starter.iml
@@@ -1,0 -1,0 +1,82 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
++  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
++    <output url="file://$MODULE_DIR$/target/classes" />
++    <output-test url="file://$MODULE_DIR$/target/test-classes" />
++    <exclude-output />
++    <content url="file://$MODULE_DIR$">
++      <sourceFolder url="file://$MODULE_DIR$/src/jvm" isTestSource="false" />
++      <sourceFolder url="file://$MODULE_DIR$/multilang" isTestSource="false" />
++      <sourceFolder url="file://$MODULE_DIR$/test/jvm" isTestSource="true" />
++      <excludeFolder url="file://$MODULE_DIR$/target" />
++    </content>
++    <orderEntry type="inheritedJdk" />
++    <orderEntry type="sourceFolder" forTests="false" />
++    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:3.8.1" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.testng:testng:6.8.5" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.beanshell:bsh:2.0b4" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: com.beust:jcommander:1.27" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.yaml:snakeyaml:1.6" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-all:1.9.0" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.easytesting:fest-assert-core:2.0M8" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.easytesting:fest-util:1.2.3" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.jmock:jmock:2.6.0" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.1" level="project" />
++    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:1.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm:0.9.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-console-logging:0.9.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-core:0.9.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:clojure:1.4.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-io:commons-io:1.4" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-exec:1.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:libthrift7:0.7.0-2" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-lang:commons-lang:2.5" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.httpcomponents:httpclient:4.1.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.httpcomponents:httpcore:4.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-codec:commons-codec:1.4" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: clj-time:clj-time:0.4.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: joda-time:joda-time:2.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.netflix.curator:curator-framework:1.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.netflix.curator:curator-client:1.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:slf4j-api:1.6.5" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.zookeeper:zookeeper:3.3.3" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: jline:jline:0.9.94" level="project" />
++    <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: backtype:jzmq:2.1.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.googlecode.json-simple:json-simple:1.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: compojure:compojure:1.1.3" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:core.incubator:0.1.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:tools.macro:0.1.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: clout:clout:1.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-core:1.1.5" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-fileupload:commons-fileupload:1.2.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: hiccup:hiccup:0.3.6" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-devel:0.3.11" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: clj-stacktrace:clj-stacktrace:0.2.2" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-jetty-adapter:0.3.11" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-servlet:0.3.11" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty:6.1.26" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:servlet-api:2.5-20081211" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:tools.logging:0.2.3" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:math.numeric-tower:0.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:carbonite:1.5.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.kryo:kryo:2.17" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.reflectasm:reflectasm:shaded:1.07" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.ow2.asm:asm:4.0" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.objenesis:objenesis:1.2" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:tools.cli:0.2.2" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: com.googlecode.disruptor:disruptor:2.10.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:jgrapht:0.8.3" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: ch.qos.logback:logback-classic:1.0.6" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: ch.qos.logback:logback-core:1.0.6" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:log4j-over-slf4j:1.6.6" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-netty:0.9.0.1" level="project" />
++    <orderEntry type="library" scope="PROVIDED" name="Maven: io.netty:netty:3.6.3.Final" level="project" />
++    <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
++  </component>
++</module>
++

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
index 0000000,0000000..cbbee3c
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
@@@ -1,0 -1,0 +1,129 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import com.google.common.collect.Lists;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++import storm.starter.tools.MockTupleHelpers;
++
++import java.util.Map;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Matchers.any;
++import static org.mockito.Mockito.*;
++
++public class IntermediateRankingsBoltTest {
++
++  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
++  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
++  private static final Object ANY_OBJECT = new Object();
++  private static final int ANY_TOPN = 10;
++  private static final long ANY_COUNT = 42;
++
++  private Tuple mockRankableTuple(Object obj, long count) {
++    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
++    when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
++    return tuple;
++  }
++
++  @DataProvider
++  public Object[][] illegalTopN() {
++    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
++  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
++    new IntermediateRankingsBolt(topN);
++  }
++
++  @DataProvider
++  public Object[][] illegalEmitFrequency() {
++    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
++  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
++    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++  }
++
++  @DataProvider
++  public Object[][] legalTopN() {
++    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++  }
++
++  @Test(dataProvider = "legalTopN")
++  public void positiveTopNShouldBeOk(int topN) {
++    new IntermediateRankingsBolt(topN);
++  }
++
++  @DataProvider
++  public Object[][] legalEmitFrequency() {
++    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++  }
++
++  @Test(dataProvider = "legalEmitFrequency")
++  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
++    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++  }
++
++  @Test
++  public void shouldEmitSomethingIfTickTupleIsReceived() {
++    // given
++    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++    BasicOutputCollector collector = mock(BasicOutputCollector.class);
++    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++    // when
++    bolt.execute(tickTuple, collector);
++
++    // then
++    // verifyZeroInteractions(collector);
++    verify(collector).emit(any(Values.class));
++  }
++
++  @Test
++  public void shouldEmitNothingIfNormalTupleIsReceived() {
++    // given
++    Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
++    BasicOutputCollector collector = mock(BasicOutputCollector.class);
++    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++    // when
++    bolt.execute(normalTuple, collector);
++
++    // then
++    verifyZeroInteractions(collector);
++  }
++
++  @Test
++  public void shouldDeclareOutputFields() {
++    // given
++    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
++    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++    // when
++    bolt.declareOutputFields(declarer);
++
++    // then
++    verify(declarer, times(1)).declare(any(Fields.class));
++  }
++
++  @Test
++  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
++    // given
++    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++    // when
++    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
++
++    // then
++    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
index 0000000,0000000..c9a516a
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
@@@ -1,0 -1,0 +1,96 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import org.testng.annotations.Test;
++import storm.starter.tools.MockTupleHelpers;
++
++import java.util.Map;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Matchers.any;
++import static org.mockito.Mockito.*;
++
++public class RollingCountBoltTest {
++
++  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
++  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
++
++  private Tuple mockNormalTuple(Object obj) {
++    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
++    when(tuple.getValue(0)).thenReturn(obj);
++    return tuple;
++  }
++
++  @SuppressWarnings("rawtypes")
++  @Test
++  public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
++    // given
++    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++    RollingCountBolt bolt = new RollingCountBolt();
++    Map conf = mock(Map.class);
++    TopologyContext context = mock(TopologyContext.class);
++    OutputCollector collector = mock(OutputCollector.class);
++    bolt.prepare(conf, context, collector);
++
++    // when
++    bolt.execute(tickTuple);
++
++    // then
++    verifyZeroInteractions(collector);
++  }
++
++  @SuppressWarnings("rawtypes")
++  @Test
++  public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
++    // given
++    Tuple normalTuple = mockNormalTuple(new Object());
++    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++
++    RollingCountBolt bolt = new RollingCountBolt();
++    Map conf = mock(Map.class);
++    TopologyContext context = mock(TopologyContext.class);
++    OutputCollector collector = mock(OutputCollector.class);
++    bolt.prepare(conf, context, collector);
++
++    // when
++    bolt.execute(normalTuple);
++    bolt.execute(tickTuple);
++
++    // then
++    verify(collector).emit(any(Values.class));
++  }
++
++  @Test
++  public void shouldDeclareOutputFields() {
++    // given
++    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
++    RollingCountBolt bolt = new RollingCountBolt();
++
++    // when
++    bolt.declareOutputFields(declarer);
++
++    // then
++    verify(declarer, times(1)).declare(any(Fields.class));
++
++  }
++
++  @Test
++  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
++    // given
++    RollingCountBolt bolt = new RollingCountBolt();
++
++    // when
++    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
++
++    // then
++    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
index 0000000,0000000..a8bed45
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
@@@ -1,0 -1,0 +1,130 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++import storm.starter.tools.MockTupleHelpers;
++import storm.starter.tools.Rankings;
++
++import java.util.Map;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Matchers.any;
++import static org.mockito.Mockito.*;
++
++public class TotalRankingsBoltTest {
++
++  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
++  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
++  private static final Object ANY_OBJECT = new Object();
++  private static final int ANY_TOPN = 10;
++  private static final long ANY_COUNT = 42;
++
++  private Tuple mockRankingsTuple(Object obj, long count) {
++    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
++    Rankings rankings = mock(Rankings.class);
++    when(tuple.getValue(0)).thenReturn(rankings);
++    return tuple;
++  }
++
++  @DataProvider
++  public Object[][] illegalTopN() {
++    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
++  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
++    new TotalRankingsBolt(topN);
++  }
++
++  @DataProvider
++  public Object[][] illegalEmitFrequency() {
++    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
++  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
++    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++  }
++
++  @DataProvider
++  public Object[][] legalTopN() {
++    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++  }
++
++  @Test(dataProvider = "legalTopN")
++  public void positiveTopNShouldBeOk(int topN) {
++    new TotalRankingsBolt(topN);
++  }
++
++  @DataProvider
++  public Object[][] legalEmitFrequency() {
++    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++  }
++
++  @Test(dataProvider = "legalEmitFrequency")
++  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
++    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++  }
++
++  @Test
++  public void shouldEmitSomethingIfTickTupleIsReceived() {
++    // given
++    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++    BasicOutputCollector collector = mock(BasicOutputCollector.class);
++    TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++    // when
++    bolt.execute(tickTuple, collector);
++
++    // then
++    // verifyZeroInteractions(collector);
++    verify(collector).emit(any(Values.class));
++  }
++
++  @Test
++  public void shouldEmitNothingIfNormalTupleIsReceived() {
++    // given
++    Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
++    BasicOutputCollector collector = mock(BasicOutputCollector.class);
++    TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++    // when
++    bolt.execute(normalTuple, collector);
++
++    // then
++    verifyZeroInteractions(collector);
++  }
++
++  @Test
++  public void shouldDeclareOutputFields() {
++    // given
++    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
++    TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++    // when
++    bolt.declareOutputFields(declarer);
++
++    // then
++    verify(declarer, times(1)).declare(any(Fields.class));
++  }
++
++  @Test
++  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
++    // given
++    TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++    // when
++    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
++
++    // then
++    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 0000000,0000000..fd7d921
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@@ -1,0 -1,0 +1,23 @@@
++package storm.starter.tools;
++
++import backtype.storm.Constants;
++import backtype.storm.tuple.Tuple;
++
++import static org.mockito.Mockito.*;
++
++public final class MockTupleHelpers {
++
++  private MockTupleHelpers() {
++  }
++
++  public static Tuple mockTickTuple() {
++    return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
++  }
++
++  public static Tuple mockTuple(String componentId, String streamId) {
++    Tuple tuple = mock(Tuple.class);
++    when(tuple.getSourceComponent()).thenReturn(componentId);
++    when(tuple.getSourceStreamId()).thenReturn(streamId);
++    return tuple;
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
index 0000000,0000000..c79d382
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
@@@ -1,0 -1,0 +1,108 @@@
++package storm.starter.tools;
++
++import backtype.storm.utils.Time;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++
++public class NthLastModifiedTimeTrackerTest {
++
++  private static final int ANY_NUM_TIMES_TO_TRACK = 3;
++  private static final int MILLIS_IN_SEC = 1000;
++
++  @DataProvider
++  public Object[][] illegalNumTimesData() {
++    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalNumTimesData")
++  public void negativeOrZeroNumTimesToTrackShouldThrowIAE(int numTimesToTrack) {
++    new NthLastModifiedTimeTracker(numTimesToTrack);
++  }
++
++  @DataProvider
++  public Object[][] legalNumTimesData() {
++    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++  }
++
++  @Test(dataProvider = "legalNumTimesData")
++  public void positiveNumTimesToTrackShouldBeOk(int numTimesToTrack) {
++    new NthLastModifiedTimeTracker(numTimesToTrack);
++  }
++
++  @DataProvider
++  public Object[][] whenNotYetMarkedAsModifiedData() {
++    return new Object[][]{ { 0 }, { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 8 }, { 10 } };
++  }
++
++  @Test(dataProvider = "whenNotYetMarkedAsModifiedData")
++  public void shouldReturnCorrectModifiedTimeEvenWhenNotYetMarkedAsModified(int secondsToAdvance) {
++    // given
++    Time.startSimulating();
++    NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK);
++
++    // when
++    advanceSimulatedTimeBy(secondsToAdvance);
++    int seconds = tracker.secondsSinceOldestModification();
++
++    // then
++    assertThat(seconds).isEqualTo(secondsToAdvance);
++
++    // cleanup
++    Time.stopSimulating();
++  }
++
++  @DataProvider
++  public Object[][] simulatedTrackerIterations() {
++    return new Object[][]{ { 1, new int[]{ 0, 1 }, new int[]{ 0, 0 } }, { 1, new int[]{ 0, 2 }, new int[]{ 0, 0 } },
++        { 2, new int[]{ 2, 2 }, new int[]{ 2, 2 } }, { 2, new int[]{ 0, 4 }, new int[]{ 0, 4 } },
++        { 1, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } },
++        { 1, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } },
++        { 2, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 1, 1, 1, 1, 1, 1 } },
++        { 2, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 2, 2, 2, 2, 2, 2 } },
++        { 2, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 2, 3, 4, 5, 6, 7 } },
++        { 3, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 2, 2, 2, 2, 2 } },
++        { 3, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 5, 7, 9, 11, 13 } },
++        { 3, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 4, 4, 4, 4, 4 } },
++        { 4, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 3, 3, 3, 3 } },
++        { 4, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 9, 12, 15, 18 } },
++        { 4, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 6, 6, 6, 6 } },
++        { 5, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 4, 4, 4 } },
++        { 5, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 14, 18, 22 } },
++        { 5, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 8, 8, 8 } },
++        { 6, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 5, 5, 5 } },
++        { 6, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 15, 20, 25 } },
++        { 6, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 10, 10, 10 } },
++        { 3, new int[]{ 1, 2, 3 }, new int[]{ 1, 3, 5 } } };
++  }
++
++  @Test(dataProvider = "simulatedTrackerIterations")
++  public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int numTimesToTrack,
++      int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) {
++    // given
++    Time.startSimulating();
++    NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(numTimesToTrack);
++
++    int[] modifiedTimes = new int[expLastModifiedTimes.length];
++
++    // when
++    int i = 0;
++    for (int secondsToAdvance : secondsToAdvancePerIteration) {
++      advanceSimulatedTimeBy(secondsToAdvance);
++      tracker.markAsModified();
++      modifiedTimes[i] = tracker.secondsSinceOldestModification();
++      i++;
++    }
++
++    // then
++    assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes);
++
++    // cleanup
++    Time.stopSimulating();
++  }
++
++  private void advanceSimulatedTimeBy(int seconds) {
++    Time.advanceTime(seconds * MILLIS_IN_SEC);
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
index 0000000,0000000..82a89e6
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
@@@ -1,0 -1,0 +1,235 @@@
++package storm.starter.tools;
++
++import backtype.storm.tuple.Tuple;
++import com.google.common.collect.Lists;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++
++import java.util.ArrayList;
++import java.util.List;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Mockito.*;
++import static org.testng.Assert.assertFalse;
++import static org.testng.Assert.assertTrue;
++
++public class RankableObjectWithFieldsTest {
++
++  private static final Object ANY_OBJECT = new Object();
++  private static final long ANY_COUNT = 271;
++  private static final String ANY_FIELD = "someAdditionalField";
++  private static final int GREATER_THAN = 1;
++  private static final int EQUAL_TO = 0;
++  private static final int SMALLER_THAN = -1;
++
++  @Test(expectedExceptions = IllegalArgumentException.class)
++  public void constructorWithNullObjectAndNoFieldsShouldThrowIAE() {
++    new RankableObjectWithFields(null, ANY_COUNT);
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class)
++  public void constructorWithNullObjectAndFieldsShouldThrowIAE() {
++    Object someAdditionalField = new Object();
++    new RankableObjectWithFields(null, ANY_COUNT, someAdditionalField);
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class)
++  public void constructorWithNegativeCountAndNoFieldsShouldThrowIAE() {
++    new RankableObjectWithFields(ANY_OBJECT, -1);
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class)
++  public void constructorWithNegativeCountAndFieldsShouldThrowIAE() {
++    Object someAdditionalField = new Object();
++    new RankableObjectWithFields(ANY_OBJECT, -1, someAdditionalField);
++  }
++
++  @Test
++  public void shouldBeEqualToItself() {
++    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT);
++    assertThat(r).isEqualTo(r);
++  }
++
++  @DataProvider
++  public Object[][] otherClassesData() {
++    return new Object[][]{ { new String("foo") }, { new Object() }, { Integer.valueOf(4) }, { Lists.newArrayList(7, 8,
++        9) } };
++  }
++
++  @Test(dataProvider = "otherClassesData")
++  public void shouldNotBeEqualToInstancesOfOtherClasses(Object notARankable) {
++    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT);
++    assertFalse(r.equals(notARankable), r + " is equal to " + notARankable + " but it should not be");
++  }
++
++  @DataProvider
++  public Object[][] falseDuplicatesData() {
++    return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1) },
++        { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("Foo", 1) },
++        { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("FOO", 1) },
++        { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 1) },
++        { new RankableObjectWithFields("", 0), new RankableObjectWithFields("", 1) }, { new RankableObjectWithFields("",
++        1), new RankableObjectWithFields("bar", 1) } };
++  }
++
++  @Test(dataProvider = "falseDuplicatesData")
++  public void shouldNotBeEqualToFalseDuplicates(RankableObjectWithFields r, RankableObjectWithFields falseDuplicate) {
++    assertFalse(r.equals(falseDuplicate), r + " is equal to " + falseDuplicate + " but it should not be");
++  }
++
++  @Test(dataProvider = "falseDuplicatesData")
++  public void shouldHaveDifferentHashCodeThanFalseDuplicates(RankableObjectWithFields r,
++      RankableObjectWithFields falseDuplicate) {
++    assertThat(r.hashCode()).isNotEqualTo(falseDuplicate.hashCode());
++  }
++
++  @DataProvider
++  public Object[][] trueDuplicatesData() {
++    return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0) },
++        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0, "someOtherField") },
++        { new RankableObjectWithFields("foo", 0, "someField"), new RankableObjectWithFields("foo", 0,
++            "someOtherField") } };
++  }
++
++  @Test(dataProvider = "trueDuplicatesData")
++  public void shouldBeEqualToTrueDuplicates(RankableObjectWithFields r, RankableObjectWithFields trueDuplicate) {
++    assertTrue(r.equals(trueDuplicate), r + " is not equal to " + trueDuplicate + " but it should be");
++  }
++
++  @Test(dataProvider = "trueDuplicatesData")
++  public void shouldHaveSameHashCodeAsTrueDuplicates(RankableObjectWithFields r,
++      RankableObjectWithFields trueDuplicate) {
++    assertThat(r.hashCode()).isEqualTo(trueDuplicate.hashCode());
++  }
++
++  @DataProvider
++  public Object[][] compareToData() {
++    return new Object[][]{ { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("foo", 0),
++        GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("foo", 0),
++        GREATER_THAN }, { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("bar", 0),
++        GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 0),
++        GREATER_THAN }, { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0), EQUAL_TO },
++        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 0), EQUAL_TO },
++        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1000), SMALLER_THAN },
++        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1), SMALLER_THAN },
++        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1), SMALLER_THAN },
++        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1000), SMALLER_THAN }, };
++  }
++
++  @Test(dataProvider = "compareToData")
++  public void verifyCompareTo(RankableObjectWithFields first, RankableObjectWithFields second, int expCompareToValue) {
++    assertThat(first.compareTo(second)).isEqualTo(expCompareToValue);
++  }
++
++  @DataProvider
++  public Object[][] toStringData() {
++    return new Object[][]{ { new String("foo"), 0L }, { new String("BAR"), 8L } };
++  }
++
++  @Test(dataProvider = "toStringData")
++  public void toStringShouldContainStringRepresentationsOfObjectAndCount(Object obj, long count) {
++    // given
++    RankableObjectWithFields r = new RankableObjectWithFields(obj, count);
++
++    // when
++    String strRepresentation = r.toString();
++
++    // then
++    assertThat(strRepresentation).contains(obj.toString()).contains("" + count);
++  }
++
++  @Test
++  public void shouldReturnTheObject() {
++    // given
++    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++
++    // when
++    Object obj = r.getObject();
++
++    // then
++    assertThat(obj).isEqualTo(ANY_OBJECT);
++  }
++
++  @Test
++  public void shouldReturnTheCount() {
++    // given
++    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++
++    // when
++    long count = r.getCount();
++
++    // then
++    assertThat(count).isEqualTo(ANY_COUNT);
++  }
++
++  @DataProvider
++  public Object[][] fieldsData() {
++    return new Object[][]{ { ANY_OBJECT, ANY_COUNT, new Object[]{ ANY_FIELD } },
++        { "quux", 42L, new Object[]{ "one", "two", "three" } } };
++  }
++
++  @Test(dataProvider = "fieldsData")
++  public void shouldReturnTheFields(Object obj, long count, Object[] fields) {
++    // given
++    RankableObjectWithFields r = new RankableObjectWithFields(obj, count, fields);
++
++    // when
++    List<Object> actualFields = r.getFields();
++
++    // then
++    assertThat(actualFields).isEqualTo(Lists.newArrayList(fields));
++  }
++
++  @Test(expectedExceptions = UnsupportedOperationException.class)
++  public void fieldsShouldBeImmutable() {
++    // given
++    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++
++    // when
++    List<Object> fields = r.getFields();
++    // try to modify the list, which should fail
++    fields.remove(0);
++
++    // then (exception)
++  }
++
++  @Test
++  public void shouldCreateRankableObjectFromTuple() {
++    // given
++    Tuple tuple = mock(Tuple.class);
++    List<Object> tupleValues = Lists.newArrayList(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++    when(tuple.getValues()).thenReturn(tupleValues);
++
++    // when
++    RankableObjectWithFields r = RankableObjectWithFields.from(tuple);
++
++    // then
++    assertThat(r.getObject()).isEqualTo(ANY_OBJECT);
++    assertThat(r.getCount()).isEqualTo(ANY_COUNT);
++    List<Object> fields = new ArrayList<Object>();
++    fields.add(ANY_FIELD);
++    assertThat(r.getFields()).isEqualTo(fields);
++
++  }
++
++  @DataProvider
++  public Object[][] copyData() {
++    return new Object[][]{ { new RankableObjectWithFields("foo", 0) }, { new RankableObjectWithFields("foo", 3,
++        "someOtherField") }, { new RankableObjectWithFields("foo", 0, "someField") } };
++  }
++
++  // TODO: What would be a good test to ensure that RankableObjectWithFields is at least somewhat defensively copied?
++  //       The contract of Rankable#copy() returns a Rankable value, not a RankableObjectWithFields.
++  @Test(dataProvider = "copyData")
++  public void copyShouldReturnCopy(RankableObjectWithFields original) {
++    // given
++
++    // when
++    Rankable copy = original.copy();
++
++    // then
++    assertThat(copy.getObject()).isEqualTo(original.getObject());
++    assertThat(copy.getCount()).isEqualTo(original.getCount());
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
index 0000000,0000000..e1f8290
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
@@@ -1,0 -1,0 +1,351 @@@
++package storm.starter.tools;
++
++import com.google.common.base.Throwables;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++import org.jmock.lib.concurrent.Blitzer;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++
++import java.util.List;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++
++public class RankingsTest {
++
++  private static final int ANY_TOPN = 42;
++  private static final Rankable ANY_RANKABLE = new RankableObjectWithFields("someObject", ANY_TOPN);
++  private static final Rankable ZERO = new RankableObjectWithFields("ZERO_COUNT", 0);
++  private static final Rankable A = new RankableObjectWithFields("A", 1);
++  private static final Rankable B = new RankableObjectWithFields("B", 2);
++  private static final Rankable C = new RankableObjectWithFields("C", 3);
++  private static final Rankable D = new RankableObjectWithFields("D", 4);
++  private static final Rankable E = new RankableObjectWithFields("E", 5);
++  private static final Rankable F = new RankableObjectWithFields("F", 6);
++  private static final Rankable G = new RankableObjectWithFields("G", 7);
++  private static final Rankable H = new RankableObjectWithFields("H", 8);
++
++  @DataProvider
++  public Object[][] illegalTopNData() {
++    return new Object[][]{ { 0 }, { -1 }, { -2 }, { -10 } };
++  }
++
++  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopNData")
++  public void constructorWithNegativeOrZeroTopNShouldThrowIAE(int topN) {
++    new Rankings(topN);
++  }
++
++  @DataProvider
++  public Object[][] copyRankingsData() {
++    return new Object[][]{ { 5, Lists.newArrayList(A, B, C) }, { 2, Lists.newArrayList(A, B, C, D) },
++        { 1, Lists.newArrayList() }, { 1, Lists.newArrayList(A) }, { 1, Lists.newArrayList(A, B) } };
++  }
++
++  @Test(dataProvider = "copyRankingsData")
++  public void copyConstructorShouldReturnCopy(int topN, List<Rankable> rankables) {
++    // given
++    Rankings rankings = new Rankings(topN);
++    for (Rankable r : rankables) {
++      rankings.updateWith(r);
++    }
++
++    // when
++    Rankings copy = new Rankings(rankings);
++
++    // then
++    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
++    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
++  }
++
++  @DataProvider
++  public Object[][] defensiveCopyRankingsData() {
++    return new Object[][]{ { 5, Lists.newArrayList(A, B, C), Lists.newArrayList(D) }, { 2, Lists.newArrayList(A, B, C,
++        D), Lists.newArrayList(E, F) }, { 1, Lists.newArrayList(), Lists.newArrayList(A) }, { 1, Lists.newArrayList(A),
++        Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO),
++        Lists.newArrayList() } };
++  }
++
++  @Test(dataProvider = "defensiveCopyRankingsData")
++  public void copyConstructorShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
++    // given
++    Rankings original = new Rankings(topN);
++    for (Rankable r : rankables) {
++      original.updateWith(r);
++    }
++    int expSize = original.size();
++    List<Rankable> expRankings = original.getRankings();
++
++    // when
++    Rankings copy = new Rankings(original);
++    for (Rankable r : changes) {
++      copy.updateWith(r);
++    }
++
++    // then
++    assertThat(original.size()).isEqualTo(expSize);
++    assertThat(original.getRankings()).isEqualTo(expRankings);
++  }
++
++  @DataProvider
++  public Object[][] legalTopNData() {
++    return new Object[][]{ { 1 }, { 2 }, { 1000 }, { 1000000 } };
++  }
++
++  @Test(dataProvider = "legalTopNData")
++  public void constructorWithPositiveTopNShouldBeOk(int topN) {
++    // given/when
++    Rankings rankings = new Rankings(topN);
++
++    // then
++    assertThat(rankings.maxSize()).isEqualTo(topN);
++  }
++
++  @Test
++  public void shouldHaveDefaultConstructor() {
++    new Rankings();
++  }
++
++  @Test
++  public void defaultConstructorShouldSetPositiveTopN() {
++    // given/when
++    Rankings rankings = new Rankings();
++
++    // then
++    assertThat(rankings.maxSize()).isGreaterThan(0);
++  }
++
++  @DataProvider
++  public Object[][] rankingsGrowData() {
++    return new Object[][]{ { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1), new RankableObjectWithFields(
++        "B", 2), new RankableObjectWithFields("C", 3)) }, { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1),
++        new RankableObjectWithFields("B", 2), new RankableObjectWithFields("C", 3), new RankableObjectWithFields("D",
++        4)) } };
++  }
++
++  @Test(dataProvider = "rankingsGrowData")
++  public void sizeOfRankingsShouldNotGrowBeyondTopN(int topN, List<Rankable> rankables) {
++    // sanity check of the provided test data
++    assertThat(rankables.size()).overridingErrorMessage(
++        "The supplied test data is not correct: the number of rankables <%d> should be greater than <%d>",
++        rankables.size(), topN).isGreaterThan(topN);
++
++    // given
++    Rankings rankings = new Rankings(topN);
++
++    // when
++    for (Rankable r : rankables) {
++      rankings.updateWith(r);
++    }
++
++    // then
++    assertThat(rankings.size()).isLessThanOrEqualTo(rankings.maxSize());
++  }
++
++  @DataProvider
++  public Object[][] simulatedRankingsData() {
++    return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A) }, { Lists.newArrayList(B, D, A, C),
++        Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B, F, A, C, D, E), Lists.newArrayList(F, E, D, C, B,
++        A) }, { Lists.newArrayList(G, B, F, A, C, D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } };
++  }
++
++  @Test(dataProvider = "simulatedRankingsData")
++  public void shouldCorrectlyRankWhenUpdatedWithRankables(List<Rankable> unsorted, List<Rankable> expSorted) {
++    // given
++    Rankings rankings = new Rankings(unsorted.size());
++
++    // when
++    for (Rankable r : unsorted) {
++      rankings.updateWith(r);
++    }
++
++    // then
++    assertThat(rankings.getRankings()).isEqualTo(expSorted);
++  }
++
++  @Test(dataProvider = "simulatedRankingsData")
++  public void shouldCorrectlyRankWhenEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
++      List<Rankable> expSorted) {
++    // given
++    Rankings rankings = new Rankings(unsorted.size());
++    Rankings otherRankings = new Rankings(rankings.maxSize());
++    for (Rankable r : unsorted) {
++      otherRankings.updateWith(r);
++    }
++
++    // when
++    rankings.updateWith(otherRankings);
++
++    // then
++    assertThat(rankings.getRankings()).isEqualTo(expSorted);
++  }
++
++  @Test(dataProvider = "simulatedRankingsData")
++  public void shouldCorrectlyRankWhenUpdatedWithEmptyOtherRankings(List<Rankable> unsorted, List<Rankable> expSorted) {
++    // given
++    Rankings rankings = new Rankings(unsorted.size());
++    for (Rankable r : unsorted) {
++      rankings.updateWith(r);
++    }
++    Rankings emptyRankings = new Rankings(ANY_TOPN);
++
++    // when
++    rankings.updateWith(emptyRankings);
++
++    // then
++    assertThat(rankings.getRankings()).isEqualTo(expSorted);
++  }
++
++  @DataProvider
++  public Object[][] simulatedRankingsAndOtherRankingsData() {
++    return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A), Lists.newArrayList(A) },
++        { Lists.newArrayList(A, C), Lists.newArrayList(B, D), Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B,
++        F, A), Lists.newArrayList(C, D, E), Lists.newArrayList(F, E, D, C, B, A) }, { Lists.newArrayList(G, B, F, A, C),
++        Lists.newArrayList(D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } };
++  }
++
++  @Test(dataProvider = "simulatedRankingsAndOtherRankingsData")
++  public void shouldCorrectlyRankWhenNotEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
++      List<Rankable> unsortedForOtherRankings, List<Rankable> expSorted) {
++    // given
++    Rankings rankings = new Rankings(expSorted.size());
++    for (Rankable r : unsorted) {
++      rankings.updateWith(r);
++    }
++    Rankings otherRankings = new Rankings(unsortedForOtherRankings.size());
++    for (Rankable r : unsortedForOtherRankings) {
++      otherRankings.updateWith(r);
++    }
++
++    // when
++    rankings.updateWith(otherRankings);
++
++    // then
++    assertThat(rankings.getRankings()).isEqualTo(expSorted);
++  }
++
++  @DataProvider
++  public Object[][] duplicatesData() {
++    Rankable A1 = new RankableObjectWithFields("A", 1);
++    Rankable A2 = new RankableObjectWithFields("A", 2);
++    Rankable A3 = new RankableObjectWithFields("A", 3);
++    return new Object[][]{ { Lists.newArrayList(ANY_RANKABLE, ANY_RANKABLE, ANY_RANKABLE) }, { Lists.newArrayList(A1,
++        A2, A3) }, };
++  }
++
++  @Test(dataProvider = "duplicatesData")
++  public void shouldNotRankDuplicateObjectsMoreThanOnce(List<Rankable> duplicates) {
++    // given
++    Rankings rankings = new Rankings(duplicates.size());
++
++    // when
++    for (Rankable r : duplicates) {
++      rankings.updateWith(r);
++    }
++
++    // then
++    assertThat(rankings.size()).isEqualTo(1);
++  }
++
++  @DataProvider
++  public Object[][] removeZeroRankingsData() {
++    return new Object[][]{ { Lists.newArrayList(A, ZERO), Lists.newArrayList(A) }, { Lists.newArrayList(A),
++        Lists.newArrayList(A) }, { Lists.newArrayList(ZERO, A), Lists.newArrayList(A) }, { Lists.newArrayList(ZERO),
++        Lists.newArrayList() }, { Lists.newArrayList(ZERO, new RankableObjectWithFields("ZERO2", 0)),
++        Lists.newArrayList() }, { Lists.newArrayList(B, ZERO, new RankableObjectWithFields("ZERO2", 0), D,
++        new RankableObjectWithFields("ZERO3", 0), new RankableObjectWithFields("ZERO4", 0), C), Lists.newArrayList(D, C,
++        B) }, { Lists.newArrayList(A, ZERO, B), Lists.newArrayList(B, A) } };
++  }
++
++  @Test(dataProvider = "removeZeroRankingsData")
++  public void shouldRemoveZeroCounts(List<Rankable> unsorted, List<Rankable> expSorted) {
++    // given
++    Rankings rankings = new Rankings(unsorted.size());
++    for (Rankable r : unsorted) {
++      rankings.updateWith(r);
++    }
++
++    // when
++    rankings.pruneZeroCounts();
++
++    // then
++    assertThat(rankings.getRankings()).isEqualTo(expSorted);
++  }
++
++  @Test
++  public void updatingWithNewRankablesShouldBeThreadSafe() throws InterruptedException {
++    // given
++    final List<Rankable> entries = ImmutableList.of(A, B, C, D);
++    final Rankings rankings = new Rankings(entries.size());
++
++    // We are capturing exceptions thrown in Blitzer's child threads into this data structure so that we can properly
++    // pass/fail this test.  The reason is that Blitzer doesn't report exceptions, which is a known bug in Blitzer
++    // (JMOCK-263).  See https://github.com/jmock-developers/jmock-library/issues/22 for more information.
++    final List<Exception> exceptions = Lists.newArrayList();
++    Blitzer blitzer = new Blitzer(1000);
++
++    // when
++    blitzer.blitz(new Runnable() {
++      public void run() {
++        for (Rankable r : entries) {
++          try {
++            rankings.updateWith(r);
++          }
++          catch (RuntimeException e) {
++            synchronized(exceptions) {
++              exceptions.add(e);
++            }
++          }
++        }
++      }
++    });
++    blitzer.shutdown();
++
++    // then
++    //
++    if (!exceptions.isEmpty()) {
++      for (Exception e : exceptions) {
++        System.err.println(Throwables.getStackTraceAsString(e));
++      }
++    }
++    assertThat(exceptions).isEmpty();
++  }
++
++  @Test(dataProvider = "copyRankingsData")
++  public void copyShouldReturnCopy(int topN, List<Rankable> rankables) {
++    // given
++    Rankings rankings = new Rankings(topN);
++    for (Rankable r : rankables) {
++      rankings.updateWith(r);
++    }
++
++    // when
++    Rankings copy = rankings.copy();
++
++    // then
++    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
++    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
++  }
++
++  @Test(dataProvider = "defensiveCopyRankingsData")
++  public void copyShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
++    // given
++    Rankings original = new Rankings(topN);
++    for (Rankable r : rankables) {
++      original.updateWith(r);
++    }
++    int expSize = original.size();
++    List<Rankable> expRankings = original.getRankings();
++
++    // when
++    Rankings copy = original.copy();
++    for (Rankable r : changes) {
++      copy.updateWith(r);
++    }
++    copy.pruneZeroCounts();
++
++    // then
++    assertThat(original.size()).isEqualTo(expSize);
++    assertThat(original.getRankings()).isEqualTo(expRankings);
++  }
++
++}