You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/03/20 22:23:04 UTC
[41/50] [abbrv] merge storm-starter into examples
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
index 0000000,0000000..e808f83
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
@@@ -1,0 -1,0 +1,47 @@@
++package storm.starter.spout;
++
++import backtype.storm.spout.SpoutOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichSpout;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import backtype.storm.utils.Utils;
++
++import java.util.Map;
++import java.util.Random;
++
++public class RandomSentenceSpout extends BaseRichSpout {
++ SpoutOutputCollector _collector;
++ Random _rand;
++
++
++ @Override
++ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
++ _collector = collector;
++ _rand = new Random();
++ }
++
++ @Override
++ public void nextTuple() {
++ Utils.sleep(100);
++ String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
++ "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
++ String sentence = sentences[_rand.nextInt(sentences.length)];
++ _collector.emit(new Values(sentence));
++ }
++
++ @Override
++ public void ack(Object id) {
++ }
++
++ @Override
++ public void fail(Object id) {
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++ declarer.declare(new Fields("word"));
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
index 0000000,0000000..0a48cf2
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
@@@ -1,0 -1,0 +1,105 @@@
++/*
++
++package storm.starter.spout;
++
++import backtype.storm.Config;
++import twitter4j.conf.ConfigurationBuilder;
++import twitter4j.TwitterStream;
++import twitter4j.TwitterStreamFactory;
++import backtype.storm.spout.SpoutOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichSpout;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import backtype.storm.utils.Utils;
++import java.util.Map;
++import java.util.concurrent.LinkedBlockingQueue;
++import twitter4j.Status;
++import twitter4j.StatusDeletionNotice;
++import twitter4j.StatusListener;
++
++public class TwitterSampleSpout extends BaseRichSpout {
++ SpoutOutputCollector _collector;
++ LinkedBlockingQueue<Status> queue = null;
++ TwitterStream _twitterStream;
++ String _username;
++ String _pwd;
++
++
++ public TwitterSampleSpout(String username, String pwd) {
++ _username = username;
++ _pwd = pwd;
++ }
++
++ @Override
++ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
++ queue = new LinkedBlockingQueue<Status>(1000);
++ _collector = collector;
++ StatusListener listener = new StatusListener() {
++
++ @Override
++ public void onStatus(Status status) {
++ queue.offer(status);
++ }
++
++ @Override
++ public void onDeletionNotice(StatusDeletionNotice sdn) {
++ }
++
++ @Override
++ public void onTrackLimitationNotice(int i) {
++ }
++
++ @Override
++ public void onScrubGeo(long l, long l1) {
++ }
++
++ @Override
++ public void onException(Exception e) {
++ }
++
++ };
++ TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build());
++ _twitterStream = fact.getInstance();
++ _twitterStream.addListener(listener);
++ _twitterStream.sample();
++ }
++
++ @Override
++ public void nextTuple() {
++ Status ret = queue.poll();
++ if(ret==null) {
++ Utils.sleep(50);
++ } else {
++ _collector.emit(new Values(ret));
++ }
++ }
++
++ @Override
++ public void close() {
++ _twitterStream.shutdown();
++ }
++
++ @Override
++ public Map<String, Object> getComponentConfiguration() {
++ Config ret = new Config();
++ ret.setMaxTaskParallelism(1);
++ return ret;
++ }
++
++ @Override
++ public void ack(Object id) {
++ }
++
++ @Override
++ public void fail(Object id) {
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++ declarer.declare(new Fields("tweet"));
++ }
++
++}
++*/
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
index 0000000,0000000..92998a6
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
@@@ -1,0 -1,0 +1,53 @@@
++package storm.starter.tools;
++
++import backtype.storm.utils.Time;
++import org.apache.commons.collections.buffer.CircularFifoBuffer;
++
++/**
++ * This class tracks the time-since-last-modify of a "thing" in a rolling fashion.
++ * <p/>
++ * For example, create a 5-slot tracker to track the five most recent time-since-last-modify.
++ * <p/>
++ * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just
++ * been modified.
++ */
++public class NthLastModifiedTimeTracker {
++
++ private static final int MILLIS_IN_SEC = 1000;
++
++ private final CircularFifoBuffer lastModifiedTimesMillis;
++
++ public NthLastModifiedTimeTracker(int numTimesToTrack) {
++ if (numTimesToTrack < 1) {
++ throw new IllegalArgumentException(
++ "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
++ }
++ lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
++ initLastModifiedTimesMillis();
++ }
++
++ private void initLastModifiedTimesMillis() {
++ long nowCached = now();
++ for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
++ lastModifiedTimesMillis.add(Long.valueOf(nowCached));
++ }
++ }
++
++ private long now() {
++ return Time.currentTimeMillis();
++ }
++
++ public int secondsSinceOldestModification() {
++ long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
++ return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
++ }
++
++ public void markAsModified() {
++ updateLastModifiedTime();
++ }
++
++ private void updateLastModifiedTime() {
++ lastModifiedTimesMillis.add(now());
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
index 0000000,0000000..254d920
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
@@@ -1,0 -1,0 +1,15 @@@
++package storm.starter.tools;
++
++public interface Rankable extends Comparable<Rankable> {
++
++ Object getObject();
++
++ long getCount();
++
++ /**
++ * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is.
++ *
++ * @return a defensive copy
++ */
++ Rankable copy();
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
index 0000000,0000000..3079434
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java
@@@ -1,0 -1,0 +1,131 @@@
++package storm.starter.tools;
++
++import backtype.storm.tuple.Tuple;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++
++import java.io.Serializable;
++import java.util.List;
++
++/**
++ * This class wraps an objects and its associated count, including any additional data fields.
++ * <p/>
++ * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology.
++ */
++public class RankableObjectWithFields implements Rankable, Serializable {
++
++ private static final long serialVersionUID = -9102878650001058090L;
++ private static final String toStringSeparator = "|";
++
++ private final Object obj;
++ private final long count;
++ private final ImmutableList<Object> fields;
++
++ public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
++ if (obj == null) {
++ throw new IllegalArgumentException("The object must not be null");
++ }
++ if (count < 0) {
++ throw new IllegalArgumentException("The count must be >= 0");
++ }
++ this.obj = obj;
++ this.count = count;
++ fields = ImmutableList.copyOf(otherFields);
++
++ }
++
++ /**
++ * Construct a new instance based on the provided {@link Tuple}.
++ * <p/>
++ * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of
++ * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be
++ * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.
++ *
++ * @param tuple
++ *
++ * @return new instance based on the provided tuple
++ */
++ public static RankableObjectWithFields from(Tuple tuple) {
++ List<Object> otherFields = Lists.newArrayList(tuple.getValues());
++ Object obj = otherFields.remove(0);
++ Long count = (Long) otherFields.remove(0);
++ return new RankableObjectWithFields(obj, count, otherFields.toArray());
++ }
++
++ public Object getObject() {
++ return obj;
++ }
++
++ public long getCount() {
++ return count;
++ }
++
++ /**
++ * @return an immutable list of any additional data fields of the object (may be empty but will never be null)
++ */
++ public List<Object> getFields() {
++ return fields;
++ }
++
++ @Override
++ public int compareTo(Rankable other) {
++ long delta = this.getCount() - other.getCount();
++ if (delta > 0) {
++ return 1;
++ }
++ else if (delta < 0) {
++ return -1;
++ }
++ else {
++ return 0;
++ }
++ }
++
++ @Override
++ public boolean equals(Object o) {
++ if (this == o) {
++ return true;
++ }
++ if (!(o instanceof RankableObjectWithFields)) {
++ return false;
++ }
++ RankableObjectWithFields other = (RankableObjectWithFields) o;
++ return obj.equals(other.obj) && count == other.count;
++ }
++
++ @Override
++ public int hashCode() {
++ int result = 17;
++ int countHash = (int) (count ^ (count >>> 32));
++ result = 31 * result + countHash;
++ result = 31 * result + obj.hashCode();
++ return result;
++ }
++
++ public String toString() {
++ StringBuffer buf = new StringBuffer();
++ buf.append("[");
++ buf.append(obj);
++ buf.append(toStringSeparator);
++ buf.append(count);
++ for (Object field : fields) {
++ buf.append(toStringSeparator);
++ buf.append(field);
++ }
++ buf.append("]");
++ return buf.toString();
++ }
++
++ /**
++ * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however,
++ * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields.
++ *
++ * @return
++ */
++ @Override
++ public Rankable copy() {
++ List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
++ return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
index 0000000,0000000..5076bf6
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java
@@@ -1,0 -1,0 +1,139 @@@
++package storm.starter.tools;
++
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++
++import java.io.Serializable;
++import java.util.Collections;
++import java.util.List;
++
++public class Rankings implements Serializable {
++
++ private static final long serialVersionUID = -1549827195410578903L;
++ private static final int DEFAULT_COUNT = 10;
++
++ private final int maxSize;
++ private final List<Rankable> rankedItems = Lists.newArrayList();
++
++ public Rankings() {
++ this(DEFAULT_COUNT);
++ }
++
++ public Rankings(int topN) {
++ if (topN < 1) {
++ throw new IllegalArgumentException("topN must be >= 1");
++ }
++ maxSize = topN;
++ }
++
++ /**
++ * Copy constructor.
++ * @param other
++ */
++ public Rankings(Rankings other) {
++ this(other.maxSize());
++ updateWith(other);
++ }
++
++ /**
++ * @return the maximum possible number (size) of ranked objects this instance can hold
++ */
++ public int maxSize() {
++ return maxSize;
++ }
++
++ /**
++ * @return the number (size) of ranked objects this instance is currently holding
++ */
++ public int size() {
++ return rankedItems.size();
++ }
++
++ /**
++ * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the
++ * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the
++ * contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within
++ * a Rankable will be defensively copied, too.
++ *
++ * @return a somewhat defensive copy of ranked items
++ */
++ public List<Rankable> getRankings() {
++ List<Rankable> copy = Lists.newLinkedList();
++ for (Rankable r: rankedItems) {
++ copy.add(r.copy());
++ }
++ return ImmutableList.copyOf(copy);
++ }
++
++ public void updateWith(Rankings other) {
++ for (Rankable r : other.getRankings()) {
++ updateWith(r);
++ }
++ }
++
++ public void updateWith(Rankable r) {
++ synchronized(rankedItems) {
++ addOrReplace(r);
++ rerank();
++ shrinkRankingsIfNeeded();
++ }
++ }
++
++ private void addOrReplace(Rankable r) {
++ Integer rank = findRankOf(r);
++ if (rank != null) {
++ rankedItems.set(rank, r);
++ }
++ else {
++ rankedItems.add(r);
++ }
++ }
++
++ private Integer findRankOf(Rankable r) {
++ Object tag = r.getObject();
++ for (int rank = 0; rank < rankedItems.size(); rank++) {
++ Object cur = rankedItems.get(rank).getObject();
++ if (cur.equals(tag)) {
++ return rank;
++ }
++ }
++ return null;
++ }
++
++ private void rerank() {
++ Collections.sort(rankedItems);
++ Collections.reverse(rankedItems);
++ }
++
++ private void shrinkRankingsIfNeeded() {
++ if (rankedItems.size() > maxSize) {
++ rankedItems.remove(maxSize);
++ }
++ }
++
++ /**
++ * Removes ranking entries that have a count of zero.
++ */
++ public void pruneZeroCounts() {
++ int i = 0;
++ while (i < rankedItems.size()) {
++ if (rankedItems.get(i).getCount() == 0) {
++ rankedItems.remove(i);
++ }
++ else {
++ i++;
++ }
++ }
++ }
++
++ public String toString() {
++ return rankedItems.toString();
++ }
++
++ /**
++ * Creates a (defensive) copy of itself.
++ */
++ public Rankings copy() {
++ return new Rankings(this);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
index 0000000,0000000..38becdc
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java
@@@ -1,0 -1,0 +1,102 @@@
++package storm.starter.tools;
++
++import java.io.Serializable;
++import java.util.Map;
++
++/**
++ * This class counts objects in a sliding window fashion.
++ * <p/>
++ * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment
++ * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access
++ * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the
++ * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads
++ * will go to. Also, by itself this class will not advance the head slot.
++ * <p/>
++ * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code>
++ * iterations, this sliding window counter will always return object counts that are equal or greater than in the
++ * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually,
++ * this is the desired behavior.
++ * <p/>
++ * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each:
++ * <p/>
++ * <pre>
++ * {@code
++ * Sliding window counts of an object X over time
++ *
++ * Minute (timeline):
++ * 1 2 3 4 5 6 7 8
++ *
++ * Observed counts per minute:
++ * 1 1 1 1 0 0 0 0
++ *
++ * Counts returned by counter:
++ * 1 2 3 4 4 3 2 1
++ * }
++ * </pre>
++ * <p/>
++ * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the
++ * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load
++ * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your
++ * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously
++ * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing
++ * counts.
++ * <p/>
++ * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes,
++ * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times
++ * in the past five minutes", implying that it can only account for the last two of those five minutes because the
++ * counter was not running before that time.
++ *
++ * @param <T> The type of those objects we want to count.
++ */
++public final class SlidingWindowCounter<T> implements Serializable {
++
++ private static final long serialVersionUID = -2645063988768785810L;
++
++ private SlotBasedCounter<T> objCounter;
++ private int headSlot;
++ private int tailSlot;
++ private int windowLengthInSlots;
++
++ public SlidingWindowCounter(int windowLengthInSlots) {
++ if (windowLengthInSlots < 2) {
++ throw new IllegalArgumentException(
++ "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")");
++ }
++ this.windowLengthInSlots = windowLengthInSlots;
++ this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
++
++ this.headSlot = 0;
++ this.tailSlot = slotAfter(headSlot);
++ }
++
++ public void incrementCount(T obj) {
++ objCounter.incrementCount(obj, headSlot);
++ }
++
++ /**
++ * Return the current (total) counts of all tracked objects, then advance the window.
++ * <p/>
++ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
++ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
++ * objects within the next "chunk" of the sliding window.
++ *
++ * @return The current (total) counts of all tracked objects.
++ */
++ public Map<T, Long> getCountsThenAdvanceWindow() {
++ Map<T, Long> counts = objCounter.getCounts();
++ objCounter.wipeZeros();
++ objCounter.wipeSlot(tailSlot);
++ advanceHead();
++ return counts;
++ }
++
++ private void advanceHead() {
++ headSlot = tailSlot;
++ tailSlot = slotAfter(tailSlot);
++ }
++
++ private int slotAfter(int slot) {
++ return (slot + 1) % windowLengthInSlots;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
index 0000000,0000000..f45ef48
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java
@@@ -1,0 -1,0 +1,101 @@@
++package storm.starter.tools;
++
++import java.io.Serializable;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++
++/**
++ * This class provides per-slot counts of the occurrences of objects.
++ * <p/>
++ * It can be used, for instance, as a building block for implementing sliding window counting of objects.
++ *
++ * @param <T> The type of those objects we want to count.
++ */
++public final class SlotBasedCounter<T> implements Serializable {
++
++ private static final long serialVersionUID = 4858185737378394432L;
++
++ private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
++ private final int numSlots;
++
++ public SlotBasedCounter(int numSlots) {
++ if (numSlots <= 0) {
++ throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")");
++ }
++ this.numSlots = numSlots;
++ }
++
++ public void incrementCount(T obj, int slot) {
++ long[] counts = objToCounts.get(obj);
++ if (counts == null) {
++ counts = new long[this.numSlots];
++ objToCounts.put(obj, counts);
++ }
++ counts[slot]++;
++ }
++
++ public long getCount(T obj, int slot) {
++ long[] counts = objToCounts.get(obj);
++ if (counts == null) {
++ return 0;
++ }
++ else {
++ return counts[slot];
++ }
++ }
++
++ public Map<T, Long> getCounts() {
++ Map<T, Long> result = new HashMap<T, Long>();
++ for (T obj : objToCounts.keySet()) {
++ result.put(obj, computeTotalCount(obj));
++ }
++ return result;
++ }
++
++ private long computeTotalCount(T obj) {
++ long[] curr = objToCounts.get(obj);
++ long total = 0;
++ for (long l : curr) {
++ total += l;
++ }
++ return total;
++ }
++
++ /**
++ * Reset the slot count of any tracked objects to zero for the given slot.
++ *
++ * @param slot
++ */
++ public void wipeSlot(int slot) {
++ for (T obj : objToCounts.keySet()) {
++ resetSlotCountToZero(obj, slot);
++ }
++ }
++
++ private void resetSlotCountToZero(T obj, int slot) {
++ long[] counts = objToCounts.get(obj);
++ counts[slot] = 0;
++ }
++
++ private boolean shouldBeRemovedFromCounter(T obj) {
++ return computeTotalCount(obj) == 0;
++ }
++
++ /**
++ * Remove any object from the counter whose total count is zero (to free up memory).
++ */
++ public void wipeZeros() {
++ Set<T> objToBeRemoved = new HashSet<T>();
++ for (T obj : objToCounts.keySet()) {
++ if (shouldBeRemovedFromCounter(obj)) {
++ objToBeRemoved.add(obj);
++ }
++ }
++ for (T obj : objToBeRemoved) {
++ objToCounts.remove(obj);
++ }
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
index 0000000,0000000..a940b40
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java
@@@ -1,0 -1,0 +1,139 @@@
++package storm.starter.trident;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.task.IMetricsContext;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import storm.trident.TridentState;
++import storm.trident.TridentTopology;
++import storm.trident.operation.BaseFunction;
++import storm.trident.operation.CombinerAggregator;
++import storm.trident.operation.TridentCollector;
++import storm.trident.operation.builtin.MapGet;
++import storm.trident.operation.builtin.Sum;
++import storm.trident.state.ReadOnlyState;
++import storm.trident.state.State;
++import storm.trident.state.StateFactory;
++import storm.trident.state.map.ReadOnlyMapState;
++import storm.trident.tuple.TridentTuple;
++
++import java.util.*;
++
++public class TridentReach {
++ public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
++ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
++ put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
++ put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
++ }};
++
++ public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
++ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
++ put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
++ put("tim", Arrays.asList("alex"));
++ put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
++ put("adam", Arrays.asList("david", "carissa"));
++ put("mike", Arrays.asList("john", "bob"));
++ put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
++ }};
++
++ public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
++ public static class Factory implements StateFactory {
++ Map _map;
++
++ public Factory(Map map) {
++ _map = map;
++ }
++
++ @Override
++ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
++ return new StaticSingleKeyMapState(_map);
++ }
++
++ }
++
++ Map _map;
++
++ public StaticSingleKeyMapState(Map map) {
++ _map = map;
++ }
++
++
++ @Override
++ public List<Object> multiGet(List<List<Object>> keys) {
++ List<Object> ret = new ArrayList();
++ for (List<Object> key : keys) {
++ Object singleKey = key.get(0);
++ ret.add(_map.get(singleKey));
++ }
++ return ret;
++ }
++
++ }
++
++ public static class One implements CombinerAggregator<Integer> {
++ @Override
++ public Integer init(TridentTuple tuple) {
++ return 1;
++ }
++
++ @Override
++ public Integer combine(Integer val1, Integer val2) {
++ return 1;
++ }
++
++ @Override
++ public Integer zero() {
++ return 1;
++ }
++ }
++
++ public static class ExpandList extends BaseFunction {
++
++ @Override
++ public void execute(TridentTuple tuple, TridentCollector collector) {
++ List l = (List) tuple.getValue(0);
++ if (l != null) {
++ for (Object o : l) {
++ collector.emit(new Values(o));
++ }
++ }
++ }
++
++ }
++
++ public static StormTopology buildTopology(LocalDRPC drpc) {
++ TridentTopology topology = new TridentTopology();
++ TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
++ TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
++
++
++ topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
++ "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
++ tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
++ new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
++ "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
++ return topology.build();
++ }
++
++ public static void main(String[] args) throws Exception {
++ LocalDRPC drpc = new LocalDRPC();
++
++ Config conf = new Config();
++ LocalCluster cluster = new LocalCluster();
++
++ cluster.submitTopology("reach", conf, buildTopology(drpc));
++
++ Thread.sleep(2000);
++
++ System.out.println("REACH: " + drpc.execute("reach", "aaa"));
++ System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
++ System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
++
++
++ cluster.shutdown();
++ drpc.shutdown();
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
index 0000000,0000000..c56280c
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java
@@@ -1,0 -1,0 +1,68 @@@
++package storm.starter.trident;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.StormSubmitter;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import storm.trident.TridentState;
++import storm.trident.TridentTopology;
++import storm.trident.operation.BaseFunction;
++import storm.trident.operation.TridentCollector;
++import storm.trident.operation.builtin.Count;
++import storm.trident.operation.builtin.FilterNull;
++import storm.trident.operation.builtin.MapGet;
++import storm.trident.operation.builtin.Sum;
++import storm.trident.testing.FixedBatchSpout;
++import storm.trident.testing.MemoryMapState;
++import storm.trident.tuple.TridentTuple;
++
++
++public class TridentWordCount {
++ public static class Split extends BaseFunction {
++ @Override
++ public void execute(TridentTuple tuple, TridentCollector collector) {
++ String sentence = tuple.getString(0);
++ for (String word : sentence.split(" ")) {
++ collector.emit(new Values(word));
++ }
++ }
++ }
++
++ public static StormTopology buildTopology(LocalDRPC drpc) {
++ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
++ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
++ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
++ spout.setCycle(true);
++
++ TridentTopology topology = new TridentTopology();
++ TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
++ new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
++ new Count(), new Fields("count")).parallelismHint(16);
++
++ topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
++ "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
++ new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
++ return topology.build();
++ }
++
++ public static void main(String[] args) throws Exception {
++ Config conf = new Config();
++ conf.setMaxSpoutPending(20);
++ if (args.length == 0) {
++ LocalDRPC drpc = new LocalDRPC();
++ LocalCluster cluster = new LocalCluster();
++ cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
++ for (int i = 0; i < 100; i++) {
++ System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
++ Thread.sleep(1000);
++ }
++ }
++ else {
++ conf.setNumWorkers(3);
++ StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
index 0000000,0000000..30c4f50
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
@@@ -1,0 -1,0 +1,22 @@@
++package storm.starter.util;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.generated.StormTopology;
++
++public final class StormRunner {
++
++ private static final int MILLIS_IN_SEC = 1000;
++
++ private StormRunner() {
++ }
++
++ public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
++ throws InterruptedException {
++ LocalCluster cluster = new LocalCluster();
++ cluster.submitTopology(topologyName, conf, topology);
++ Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
++ cluster.killTopology(topologyName);
++ cluster.shutdown();
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
index 0000000,0000000..a46480e
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
@@@ -1,0 -1,0 +1,16 @@@
++package storm.starter.util;
++
++import backtype.storm.Constants;
++import backtype.storm.tuple.Tuple;
++
++public final class TupleHelpers {
++
++ private TupleHelpers() {
++ }
++
++ public static boolean isTickTuple(Tuple tuple) {
++ return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
++ Constants.SYSTEM_TICK_STREAM_ID);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/storm-starter.iml
----------------------------------------------------------------------
diff --cc examples/storm-starter/storm-starter.iml
index 0000000,0000000..a4e5116
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/storm-starter.iml
@@@ -1,0 -1,0 +1,82 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
++ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
++ <output url="file://$MODULE_DIR$/target/classes" />
++ <output-test url="file://$MODULE_DIR$/target/test-classes" />
++ <exclude-output />
++ <content url="file://$MODULE_DIR$">
++ <sourceFolder url="file://$MODULE_DIR$/src/jvm" isTestSource="false" />
++ <sourceFolder url="file://$MODULE_DIR$/multilang" isTestSource="false" />
++ <sourceFolder url="file://$MODULE_DIR$/test/jvm" isTestSource="true" />
++ <excludeFolder url="file://$MODULE_DIR$/target" />
++ </content>
++ <orderEntry type="inheritedJdk" />
++ <orderEntry type="sourceFolder" forTests="false" />
++ <orderEntry type="library" scope="TEST" name="Maven: junit:junit:3.8.1" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.testng:testng:6.8.5" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.beanshell:bsh:2.0b4" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: com.beust:jcommander:1.27" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.yaml:snakeyaml:1.6" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-all:1.9.0" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.easytesting:fest-assert-core:2.0M8" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.easytesting:fest-util:1.2.3" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.jmock:jmock:2.6.0" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.1" level="project" />
++ <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:1.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm:0.9.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-console-logging:0.9.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-core:0.9.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:clojure:1.4.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: commons-io:commons-io:1.4" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-exec:1.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:libthrift7:0.7.0-2" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: commons-lang:commons-lang:2.5" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.httpcomponents:httpclient:4.1.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.httpcomponents:httpcore:4.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: commons-codec:commons-codec:1.4" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: clj-time:clj-time:0.4.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: joda-time:joda-time:2.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.netflix.curator:curator-framework:1.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.netflix.curator:curator-client:1.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:slf4j-api:1.6.5" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.zookeeper:zookeeper:3.3.3" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: jline:jline:0.9.94" level="project" />
++ <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: backtype:jzmq:2.1.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.googlecode.json-simple:json-simple:1.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: compojure:compojure:1.1.3" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:core.incubator:0.1.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:tools.macro:0.1.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: clout:clout:1.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-core:1.1.5" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: commons-fileupload:commons-fileupload:1.2.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: hiccup:hiccup:0.3.6" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-devel:0.3.11" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: clj-stacktrace:clj-stacktrace:0.2.2" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-jetty-adapter:0.3.11" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-servlet:0.3.11" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty:6.1.26" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:servlet-api:2.5-20081211" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:tools.logging:0.2.3" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:math.numeric-tower:0.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:carbonite:1.5.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.kryo:kryo:2.17" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.reflectasm:reflectasm:shaded:1.07" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.ow2.asm:asm:4.0" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.objenesis:objenesis:1.2" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:tools.cli:0.2.2" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: com.googlecode.disruptor:disruptor:2.10.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:jgrapht:0.8.3" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: ch.qos.logback:logback-classic:1.0.6" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: ch.qos.logback:logback-core:1.0.6" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:log4j-over-slf4j:1.6.6" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-netty:0.9.0.1" level="project" />
++ <orderEntry type="library" scope="PROVIDED" name="Maven: io.netty:netty:3.6.3.Final" level="project" />
++ <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
++ </component>
++</module>
++
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
index 0000000,0000000..cbbee3c
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
@@@ -1,0 -1,0 +1,129 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import com.google.common.collect.Lists;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++import storm.starter.tools.MockTupleHelpers;
++
++import java.util.Map;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Matchers.any;
++import static org.mockito.Mockito.*;
++
++public class IntermediateRankingsBoltTest {
++
++ private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
++ private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
++ private static final Object ANY_OBJECT = new Object();
++ private static final int ANY_TOPN = 10;
++ private static final long ANY_COUNT = 42;
++
++ private Tuple mockRankableTuple(Object obj, long count) {
++ Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
++ when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
++ return tuple;
++ }
++
++ @DataProvider
++ public Object[][] illegalTopN() {
++ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
++ public void negativeOrZeroTopNShouldThrowIAE(int topN) {
++ new IntermediateRankingsBolt(topN);
++ }
++
++ @DataProvider
++ public Object[][] illegalEmitFrequency() {
++ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
++ public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
++ new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++ }
++
++ @DataProvider
++ public Object[][] legalTopN() {
++ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++ }
++
++ @Test(dataProvider = "legalTopN")
++ public void positiveTopNShouldBeOk(int topN) {
++ new IntermediateRankingsBolt(topN);
++ }
++
++ @DataProvider
++ public Object[][] legalEmitFrequency() {
++ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++ }
++
++ @Test(dataProvider = "legalEmitFrequency")
++ public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
++ new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++ }
++
++ @Test
++ public void shouldEmitSomethingIfTickTupleIsReceived() {
++ // given
++ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++ BasicOutputCollector collector = mock(BasicOutputCollector.class);
++ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++ // when
++ bolt.execute(tickTuple, collector);
++
++ // then
++ // verifyZeroInteractions(collector);
++ verify(collector).emit(any(Values.class));
++ }
++
++ @Test
++ public void shouldEmitNothingIfNormalTupleIsReceived() {
++ // given
++ Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
++ BasicOutputCollector collector = mock(BasicOutputCollector.class);
++ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++ // when
++ bolt.execute(normalTuple, collector);
++
++ // then
++ verifyZeroInteractions(collector);
++ }
++
++ @Test
++ public void shouldDeclareOutputFields() {
++ // given
++ OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
++ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++ // when
++ bolt.declareOutputFields(declarer);
++
++ // then
++ verify(declarer, times(1)).declare(any(Fields.class));
++ }
++
++ @Test
++ public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
++ // given
++ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
++
++ // when
++ Map<String, Object> componentConfig = bolt.getComponentConfiguration();
++
++ // then
++ assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++ Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++ assertThat(emitFrequencyInSeconds).isGreaterThan(0);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
index 0000000,0000000..c9a516a
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
@@@ -1,0 -1,0 +1,96 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import org.testng.annotations.Test;
++import storm.starter.tools.MockTupleHelpers;
++
++import java.util.Map;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Matchers.any;
++import static org.mockito.Mockito.*;
++
++public class RollingCountBoltTest {
++
++ private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
++ private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
++
++ private Tuple mockNormalTuple(Object obj) {
++ Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
++ when(tuple.getValue(0)).thenReturn(obj);
++ return tuple;
++ }
++
++ @SuppressWarnings("rawtypes")
++ @Test
++ public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
++ // given
++ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++ RollingCountBolt bolt = new RollingCountBolt();
++ Map conf = mock(Map.class);
++ TopologyContext context = mock(TopologyContext.class);
++ OutputCollector collector = mock(OutputCollector.class);
++ bolt.prepare(conf, context, collector);
++
++ // when
++ bolt.execute(tickTuple);
++
++ // then
++ verifyZeroInteractions(collector);
++ }
++
++ @SuppressWarnings("rawtypes")
++ @Test
++ public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
++ // given
++ Tuple normalTuple = mockNormalTuple(new Object());
++ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++
++ RollingCountBolt bolt = new RollingCountBolt();
++ Map conf = mock(Map.class);
++ TopologyContext context = mock(TopologyContext.class);
++ OutputCollector collector = mock(OutputCollector.class);
++ bolt.prepare(conf, context, collector);
++
++ // when
++ bolt.execute(normalTuple);
++ bolt.execute(tickTuple);
++
++ // then
++ verify(collector).emit(any(Values.class));
++ }
++
++ @Test
++ public void shouldDeclareOutputFields() {
++ // given
++ OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
++ RollingCountBolt bolt = new RollingCountBolt();
++
++ // when
++ bolt.declareOutputFields(declarer);
++
++ // then
++ verify(declarer, times(1)).declare(any(Fields.class));
++
++ }
++
++ @Test
++ public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
++ // given
++ RollingCountBolt bolt = new RollingCountBolt();
++
++ // when
++ Map<String, Object> componentConfig = bolt.getComponentConfiguration();
++
++ // then
++ assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++ Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++ assertThat(emitFrequencyInSeconds).isGreaterThan(0);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
index 0000000,0000000..a8bed45
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
@@@ -1,0 -1,0 +1,130 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++import storm.starter.tools.MockTupleHelpers;
++import storm.starter.tools.Rankings;
++
++import java.util.Map;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Matchers.any;
++import static org.mockito.Mockito.*;
++
++public class TotalRankingsBoltTest {
++
++ private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
++ private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
++ private static final Object ANY_OBJECT = new Object();
++ private static final int ANY_TOPN = 10;
++ private static final long ANY_COUNT = 42;
++
++ private Tuple mockRankingsTuple(Object obj, long count) {
++ Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
++ Rankings rankings = mock(Rankings.class);
++ when(tuple.getValue(0)).thenReturn(rankings);
++ return tuple;
++ }
++
++ @DataProvider
++ public Object[][] illegalTopN() {
++ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
++ public void negativeOrZeroTopNShouldThrowIAE(int topN) {
++ new TotalRankingsBolt(topN);
++ }
++
++ @DataProvider
++ public Object[][] illegalEmitFrequency() {
++ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
++ public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
++ new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++ }
++
++ @DataProvider
++ public Object[][] legalTopN() {
++ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++ }
++
++ @Test(dataProvider = "legalTopN")
++ public void positiveTopNShouldBeOk(int topN) {
++ new TotalRankingsBolt(topN);
++ }
++
++ @DataProvider
++ public Object[][] legalEmitFrequency() {
++ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++ }
++
++ @Test(dataProvider = "legalEmitFrequency")
++ public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
++ new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
++ }
++
++ @Test
++ public void shouldEmitSomethingIfTickTupleIsReceived() {
++ // given
++ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
++ BasicOutputCollector collector = mock(BasicOutputCollector.class);
++ TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++ // when
++ bolt.execute(tickTuple, collector);
++
++ // then
++ // verifyZeroInteractions(collector);
++ verify(collector).emit(any(Values.class));
++ }
++
++ @Test
++ public void shouldEmitNothingIfNormalTupleIsReceived() {
++ // given
++ Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
++ BasicOutputCollector collector = mock(BasicOutputCollector.class);
++ TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++ // when
++ bolt.execute(normalTuple, collector);
++
++ // then
++ verifyZeroInteractions(collector);
++ }
++
++ @Test
++ public void shouldDeclareOutputFields() {
++ // given
++ OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
++ TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++ // when
++ bolt.declareOutputFields(declarer);
++
++ // then
++ verify(declarer, times(1)).declare(any(Fields.class));
++ }
++
++ @Test
++ public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
++ // given
++ TotalRankingsBolt bolt = new TotalRankingsBolt();
++
++ // when
++ Map<String, Object> componentConfig = bolt.getComponentConfiguration();
++
++ // then
++ assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++ Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
++ assertThat(emitFrequencyInSeconds).isGreaterThan(0);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 0000000,0000000..fd7d921
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@@ -1,0 -1,0 +1,23 @@@
++package storm.starter.tools;
++
++import backtype.storm.Constants;
++import backtype.storm.tuple.Tuple;
++
++import static org.mockito.Mockito.*;
++
++public final class MockTupleHelpers {
++
++ private MockTupleHelpers() {
++ }
++
++ public static Tuple mockTickTuple() {
++ return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
++ }
++
++ public static Tuple mockTuple(String componentId, String streamId) {
++ Tuple tuple = mock(Tuple.class);
++ when(tuple.getSourceComponent()).thenReturn(componentId);
++ when(tuple.getSourceStreamId()).thenReturn(streamId);
++ return tuple;
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
index 0000000,0000000..c79d382
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
@@@ -1,0 -1,0 +1,108 @@@
++package storm.starter.tools;
++
++import backtype.storm.utils.Time;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++
++public class NthLastModifiedTimeTrackerTest {
++
++ private static final int ANY_NUM_TIMES_TO_TRACK = 3;
++ private static final int MILLIS_IN_SEC = 1000;
++
++ @DataProvider
++ public Object[][] illegalNumTimesData() {
++ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalNumTimesData")
++ public void negativeOrZeroNumTimesToTrackShouldThrowIAE(int numTimesToTrack) {
++ new NthLastModifiedTimeTracker(numTimesToTrack);
++ }
++
++ @DataProvider
++ public Object[][] legalNumTimesData() {
++ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
++ }
++
++ @Test(dataProvider = "legalNumTimesData")
++ public void positiveNumTimesToTrackShouldBeOk(int numTimesToTrack) {
++ new NthLastModifiedTimeTracker(numTimesToTrack);
++ }
++
++ @DataProvider
++ public Object[][] whenNotYetMarkedAsModifiedData() {
++ return new Object[][]{ { 0 }, { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 8 }, { 10 } };
++ }
++
++ @Test(dataProvider = "whenNotYetMarkedAsModifiedData")
++ public void shouldReturnCorrectModifiedTimeEvenWhenNotYetMarkedAsModified(int secondsToAdvance) {
++ // given
++ Time.startSimulating();
++ NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK);
++
++ // when
++ advanceSimulatedTimeBy(secondsToAdvance);
++ int seconds = tracker.secondsSinceOldestModification();
++
++ // then
++ assertThat(seconds).isEqualTo(secondsToAdvance);
++
++ // cleanup
++ Time.stopSimulating();
++ }
++
++ @DataProvider
++ public Object[][] simulatedTrackerIterations() {
++ return new Object[][]{ { 1, new int[]{ 0, 1 }, new int[]{ 0, 0 } }, { 1, new int[]{ 0, 2 }, new int[]{ 0, 0 } },
++ { 2, new int[]{ 2, 2 }, new int[]{ 2, 2 } }, { 2, new int[]{ 0, 4 }, new int[]{ 0, 4 } },
++ { 1, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } },
++ { 1, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } },
++ { 2, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 1, 1, 1, 1, 1, 1 } },
++ { 2, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 2, 2, 2, 2, 2, 2 } },
++ { 2, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 2, 3, 4, 5, 6, 7 } },
++ { 3, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 2, 2, 2, 2, 2 } },
++ { 3, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 5, 7, 9, 11, 13 } },
++ { 3, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 4, 4, 4, 4, 4 } },
++ { 4, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 3, 3, 3, 3 } },
++ { 4, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 9, 12, 15, 18 } },
++ { 4, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 6, 6, 6, 6 } },
++ { 5, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 4, 4, 4 } },
++ { 5, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 14, 18, 22 } },
++ { 5, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 8, 8, 8 } },
++ { 6, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 5, 5, 5 } },
++ { 6, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 15, 20, 25 } },
++ { 6, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 10, 10, 10 } },
++ { 3, new int[]{ 1, 2, 3 }, new int[]{ 1, 3, 5 } } };
++ }
++
++ @Test(dataProvider = "simulatedTrackerIterations")
++ public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int numTimesToTrack,
++ int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) {
++ // given
++ Time.startSimulating();
++ NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(numTimesToTrack);
++
++ int[] modifiedTimes = new int[expLastModifiedTimes.length];
++
++ // when
++ int i = 0;
++ for (int secondsToAdvance : secondsToAdvancePerIteration) {
++ advanceSimulatedTimeBy(secondsToAdvance);
++ tracker.markAsModified();
++ modifiedTimes[i] = tracker.secondsSinceOldestModification();
++ i++;
++ }
++
++ // then
++ assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes);
++
++ // cleanup
++ Time.stopSimulating();
++ }
++
++ private void advanceSimulatedTimeBy(int seconds) {
++ Time.advanceTime(seconds * MILLIS_IN_SEC);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
index 0000000,0000000..82a89e6
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
@@@ -1,0 -1,0 +1,235 @@@
++package storm.starter.tools;
++
++import backtype.storm.tuple.Tuple;
++import com.google.common.collect.Lists;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++
++import java.util.ArrayList;
++import java.util.List;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++import static org.mockito.Mockito.*;
++import static org.testng.Assert.assertFalse;
++import static org.testng.Assert.assertTrue;
++
++public class RankableObjectWithFieldsTest {
++
++ private static final Object ANY_OBJECT = new Object();
++ private static final long ANY_COUNT = 271;
++ private static final String ANY_FIELD = "someAdditionalField";
++ private static final int GREATER_THAN = 1;
++ private static final int EQUAL_TO = 0;
++ private static final int SMALLER_THAN = -1;
++
++ @Test(expectedExceptions = IllegalArgumentException.class)
++ public void constructorWithNullObjectAndNoFieldsShouldThrowIAE() {
++ new RankableObjectWithFields(null, ANY_COUNT);
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class)
++ public void constructorWithNullObjectAndFieldsShouldThrowIAE() {
++ Object someAdditionalField = new Object();
++ new RankableObjectWithFields(null, ANY_COUNT, someAdditionalField);
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class)
++ public void constructorWithNegativeCountAndNoFieldsShouldThrowIAE() {
++ new RankableObjectWithFields(ANY_OBJECT, -1);
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class)
++ public void constructorWithNegativeCountAndFieldsShouldThrowIAE() {
++ Object someAdditionalField = new Object();
++ new RankableObjectWithFields(ANY_OBJECT, -1, someAdditionalField);
++ }
++
++ @Test
++ public void shouldBeEqualToItself() {
++ RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT);
++ assertThat(r).isEqualTo(r);
++ }
++
++ @DataProvider
++ public Object[][] otherClassesData() {
++ return new Object[][]{ { new String("foo") }, { new Object() }, { Integer.valueOf(4) }, { Lists.newArrayList(7, 8,
++ 9) } };
++ }
++
++ @Test(dataProvider = "otherClassesData")
++ public void shouldNotBeEqualToInstancesOfOtherClasses(Object notARankable) {
++ RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT);
++ assertFalse(r.equals(notARankable), r + " is equal to " + notARankable + " but it should not be");
++ }
++
++ @DataProvider
++ public Object[][] falseDuplicatesData() {
++ return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1) },
++ { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("Foo", 1) },
++ { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("FOO", 1) },
++ { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 1) },
++ { new RankableObjectWithFields("", 0), new RankableObjectWithFields("", 1) }, { new RankableObjectWithFields("",
++ 1), new RankableObjectWithFields("bar", 1) } };
++ }
++
++ @Test(dataProvider = "falseDuplicatesData")
++ public void shouldNotBeEqualToFalseDuplicates(RankableObjectWithFields r, RankableObjectWithFields falseDuplicate) {
++ assertFalse(r.equals(falseDuplicate), r + " is equal to " + falseDuplicate + " but it should not be");
++ }
++
++ @Test(dataProvider = "falseDuplicatesData")
++ public void shouldHaveDifferentHashCodeThanFalseDuplicates(RankableObjectWithFields r,
++ RankableObjectWithFields falseDuplicate) {
++ assertThat(r.hashCode()).isNotEqualTo(falseDuplicate.hashCode());
++ }
++
++ @DataProvider
++ public Object[][] trueDuplicatesData() {
++ return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0) },
++ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0, "someOtherField") },
++ { new RankableObjectWithFields("foo", 0, "someField"), new RankableObjectWithFields("foo", 0,
++ "someOtherField") } };
++ }
++
++ @Test(dataProvider = "trueDuplicatesData")
++ public void shouldBeEqualToTrueDuplicates(RankableObjectWithFields r, RankableObjectWithFields trueDuplicate) {
++ assertTrue(r.equals(trueDuplicate), r + " is not equal to " + trueDuplicate + " but it should be");
++ }
++
++ @Test(dataProvider = "trueDuplicatesData")
++ public void shouldHaveSameHashCodeAsTrueDuplicates(RankableObjectWithFields r,
++ RankableObjectWithFields trueDuplicate) {
++ assertThat(r.hashCode()).isEqualTo(trueDuplicate.hashCode());
++ }
++
++ @DataProvider
++ public Object[][] compareToData() {
++ return new Object[][]{ { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("foo", 0),
++ GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("foo", 0),
++ GREATER_THAN }, { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("bar", 0),
++ GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 0),
++ GREATER_THAN }, { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0), EQUAL_TO },
++ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 0), EQUAL_TO },
++ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1000), SMALLER_THAN },
++ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1), SMALLER_THAN },
++ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1), SMALLER_THAN },
++ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1000), SMALLER_THAN }, };
++ }
++
++ @Test(dataProvider = "compareToData")
++ public void verifyCompareTo(RankableObjectWithFields first, RankableObjectWithFields second, int expCompareToValue) {
++ assertThat(first.compareTo(second)).isEqualTo(expCompareToValue);
++ }
++
++ @DataProvider
++ public Object[][] toStringData() {
++ return new Object[][]{ { new String("foo"), 0L }, { new String("BAR"), 8L } };
++ }
++
++ @Test(dataProvider = "toStringData")
++ public void toStringShouldContainStringRepresentationsOfObjectAndCount(Object obj, long count) {
++ // given
++ RankableObjectWithFields r = new RankableObjectWithFields(obj, count);
++
++ // when
++ String strRepresentation = r.toString();
++
++ // then
++ assertThat(strRepresentation).contains(obj.toString()).contains("" + count);
++ }
++
++ @Test
++ public void shouldReturnTheObject() {
++ // given
++ RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++
++ // when
++ Object obj = r.getObject();
++
++ // then
++ assertThat(obj).isEqualTo(ANY_OBJECT);
++ }
++
++ @Test
++ public void shouldReturnTheCount() {
++ // given
++ RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++
++ // when
++ long count = r.getCount();
++
++ // then
++ assertThat(count).isEqualTo(ANY_COUNT);
++ }
++
++ @DataProvider
++ public Object[][] fieldsData() {
++ return new Object[][]{ { ANY_OBJECT, ANY_COUNT, new Object[]{ ANY_FIELD } },
++ { "quux", 42L, new Object[]{ "one", "two", "three" } } };
++ }
++
++ @Test(dataProvider = "fieldsData")
++ public void shouldReturnTheFields(Object obj, long count, Object[] fields) {
++ // given
++ RankableObjectWithFields r = new RankableObjectWithFields(obj, count, fields);
++
++ // when
++ List<Object> actualFields = r.getFields();
++
++ // then
++ assertThat(actualFields).isEqualTo(Lists.newArrayList(fields));
++ }
++
++ @Test(expectedExceptions = UnsupportedOperationException.class)
++ public void fieldsShouldBeImmutable() {
++ // given
++ RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++
++ // when
++ List<Object> fields = r.getFields();
++ // try to modify the list, which should fail
++ fields.remove(0);
++
++ // then (exception)
++ }
++
++ @Test
++ public void shouldCreateRankableObjectFromTuple() {
++ // given
++ Tuple tuple = mock(Tuple.class);
++ List<Object> tupleValues = Lists.newArrayList(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
++ when(tuple.getValues()).thenReturn(tupleValues);
++
++ // when
++ RankableObjectWithFields r = RankableObjectWithFields.from(tuple);
++
++ // then
++ assertThat(r.getObject()).isEqualTo(ANY_OBJECT);
++ assertThat(r.getCount()).isEqualTo(ANY_COUNT);
++ List<Object> fields = new ArrayList<Object>();
++ fields.add(ANY_FIELD);
++ assertThat(r.getFields()).isEqualTo(fields);
++
++ }
++
++ @DataProvider
++ public Object[][] copyData() {
++ return new Object[][]{ { new RankableObjectWithFields("foo", 0) }, { new RankableObjectWithFields("foo", 3,
++ "someOtherField") }, { new RankableObjectWithFields("foo", 0, "someField") } };
++ }
++
++ // TODO: What would be a good test to ensure that RankableObjectWithFields is at least somewhat defensively copied?
++ // The contract of Rankable#copy() returns a Rankable value, not a RankableObjectWithFields.
++ @Test(dataProvider = "copyData")
++ public void copyShouldReturnCopy(RankableObjectWithFields original) {
++ // given
++
++ // when
++ Rankable copy = original.copy();
++
++ // then
++ assertThat(copy.getObject()).isEqualTo(original.getObject());
++ assertThat(copy.getCount()).isEqualTo(original.getCount());
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
index 0000000,0000000..e1f8290
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
@@@ -1,0 -1,0 +1,351 @@@
++package storm.starter.tools;
++
++import com.google.common.base.Throwables;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++import org.jmock.lib.concurrent.Blitzer;
++import org.testng.annotations.DataProvider;
++import org.testng.annotations.Test;
++
++import java.util.List;
++
++import static org.fest.assertions.api.Assertions.assertThat;
++
++public class RankingsTest {
++
++ private static final int ANY_TOPN = 42;
++ private static final Rankable ANY_RANKABLE = new RankableObjectWithFields("someObject", ANY_TOPN);
++ private static final Rankable ZERO = new RankableObjectWithFields("ZERO_COUNT", 0);
++ private static final Rankable A = new RankableObjectWithFields("A", 1);
++ private static final Rankable B = new RankableObjectWithFields("B", 2);
++ private static final Rankable C = new RankableObjectWithFields("C", 3);
++ private static final Rankable D = new RankableObjectWithFields("D", 4);
++ private static final Rankable E = new RankableObjectWithFields("E", 5);
++ private static final Rankable F = new RankableObjectWithFields("F", 6);
++ private static final Rankable G = new RankableObjectWithFields("G", 7);
++ private static final Rankable H = new RankableObjectWithFields("H", 8);
++
++ @DataProvider
++ public Object[][] illegalTopNData() {
++ return new Object[][]{ { 0 }, { -1 }, { -2 }, { -10 } };
++ }
++
++ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopNData")
++ public void constructorWithNegativeOrZeroTopNShouldThrowIAE(int topN) {
++ new Rankings(topN);
++ }
++
++ @DataProvider
++ public Object[][] copyRankingsData() {
++ return new Object[][]{ { 5, Lists.newArrayList(A, B, C) }, { 2, Lists.newArrayList(A, B, C, D) },
++ { 1, Lists.newArrayList() }, { 1, Lists.newArrayList(A) }, { 1, Lists.newArrayList(A, B) } };
++ }
++
++ @Test(dataProvider = "copyRankingsData")
++ public void copyConstructorShouldReturnCopy(int topN, List<Rankable> rankables) {
++ // given
++ Rankings rankings = new Rankings(topN);
++ for (Rankable r : rankables) {
++ rankings.updateWith(r);
++ }
++
++ // when
++ Rankings copy = new Rankings(rankings);
++
++ // then
++ assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
++ assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
++ }
++
++ @DataProvider
++ public Object[][] defensiveCopyRankingsData() {
++ return new Object[][]{ { 5, Lists.newArrayList(A, B, C), Lists.newArrayList(D) }, { 2, Lists.newArrayList(A, B, C,
++ D), Lists.newArrayList(E, F) }, { 1, Lists.newArrayList(), Lists.newArrayList(A) }, { 1, Lists.newArrayList(A),
++ Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO),
++ Lists.newArrayList() } };
++ }
++
++ @Test(dataProvider = "defensiveCopyRankingsData")
++ public void copyConstructorShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
++ // given
++ Rankings original = new Rankings(topN);
++ for (Rankable r : rankables) {
++ original.updateWith(r);
++ }
++ int expSize = original.size();
++ List<Rankable> expRankings = original.getRankings();
++
++ // when
++ Rankings copy = new Rankings(original);
++ for (Rankable r : changes) {
++ copy.updateWith(r);
++ }
++
++ // then
++ assertThat(original.size()).isEqualTo(expSize);
++ assertThat(original.getRankings()).isEqualTo(expRankings);
++ }
++
++ @DataProvider
++ public Object[][] legalTopNData() {
++ return new Object[][]{ { 1 }, { 2 }, { 1000 }, { 1000000 } };
++ }
++
++ @Test(dataProvider = "legalTopNData")
++ public void constructorWithPositiveTopNShouldBeOk(int topN) {
++ // given/when
++ Rankings rankings = new Rankings(topN);
++
++ // then
++ assertThat(rankings.maxSize()).isEqualTo(topN);
++ }
++
++ @Test
++ public void shouldHaveDefaultConstructor() {
++ new Rankings();
++ }
++
++ @Test
++ public void defaultConstructorShouldSetPositiveTopN() {
++ // given/when
++ Rankings rankings = new Rankings();
++
++ // then
++ assertThat(rankings.maxSize()).isGreaterThan(0);
++ }
++
++ @DataProvider
++ public Object[][] rankingsGrowData() {
++ return new Object[][]{ { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1), new RankableObjectWithFields(
++ "B", 2), new RankableObjectWithFields("C", 3)) }, { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1),
++ new RankableObjectWithFields("B", 2), new RankableObjectWithFields("C", 3), new RankableObjectWithFields("D",
++ 4)) } };
++ }
++
++ @Test(dataProvider = "rankingsGrowData")
++ public void sizeOfRankingsShouldNotGrowBeyondTopN(int topN, List<Rankable> rankables) {
++ // sanity check of the provided test data
++ assertThat(rankables.size()).overridingErrorMessage(
++ "The supplied test data is not correct: the number of rankables <%d> should be greater than <%d>",
++ rankables.size(), topN).isGreaterThan(topN);
++
++ // given
++ Rankings rankings = new Rankings(topN);
++
++ // when
++ for (Rankable r : rankables) {
++ rankings.updateWith(r);
++ }
++
++ // then
++ assertThat(rankings.size()).isLessThanOrEqualTo(rankings.maxSize());
++ }
++
++ @DataProvider
++ public Object[][] simulatedRankingsData() {
++ return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A) }, { Lists.newArrayList(B, D, A, C),
++ Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B, F, A, C, D, E), Lists.newArrayList(F, E, D, C, B,
++ A) }, { Lists.newArrayList(G, B, F, A, C, D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } };
++ }
++
++ @Test(dataProvider = "simulatedRankingsData")
++ public void shouldCorrectlyRankWhenUpdatedWithRankables(List<Rankable> unsorted, List<Rankable> expSorted) {
++ // given
++ Rankings rankings = new Rankings(unsorted.size());
++
++ // when
++ for (Rankable r : unsorted) {
++ rankings.updateWith(r);
++ }
++
++ // then
++ assertThat(rankings.getRankings()).isEqualTo(expSorted);
++ }
++
++ @Test(dataProvider = "simulatedRankingsData")
++ public void shouldCorrectlyRankWhenEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
++ List<Rankable> expSorted) {
++ // given
++ Rankings rankings = new Rankings(unsorted.size());
++ Rankings otherRankings = new Rankings(rankings.maxSize());
++ for (Rankable r : unsorted) {
++ otherRankings.updateWith(r);
++ }
++
++ // when
++ rankings.updateWith(otherRankings);
++
++ // then
++ assertThat(rankings.getRankings()).isEqualTo(expSorted);
++ }
++
++ @Test(dataProvider = "simulatedRankingsData")
++ public void shouldCorrectlyRankWhenUpdatedWithEmptyOtherRankings(List<Rankable> unsorted, List<Rankable> expSorted) {
++ // given
++ Rankings rankings = new Rankings(unsorted.size());
++ for (Rankable r : unsorted) {
++ rankings.updateWith(r);
++ }
++ Rankings emptyRankings = new Rankings(ANY_TOPN);
++
++ // when
++ rankings.updateWith(emptyRankings);
++
++ // then
++ assertThat(rankings.getRankings()).isEqualTo(expSorted);
++ }
++
++ @DataProvider
++ public Object[][] simulatedRankingsAndOtherRankingsData() {
++ return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A), Lists.newArrayList(A) },
++ { Lists.newArrayList(A, C), Lists.newArrayList(B, D), Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B,
++ F, A), Lists.newArrayList(C, D, E), Lists.newArrayList(F, E, D, C, B, A) }, { Lists.newArrayList(G, B, F, A, C),
++ Lists.newArrayList(D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } };
++ }
++
++ @Test(dataProvider = "simulatedRankingsAndOtherRankingsData")
++ public void shouldCorrectlyRankWhenNotEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
++ List<Rankable> unsortedForOtherRankings, List<Rankable> expSorted) {
++ // given
++ Rankings rankings = new Rankings(expSorted.size());
++ for (Rankable r : unsorted) {
++ rankings.updateWith(r);
++ }
++ Rankings otherRankings = new Rankings(unsortedForOtherRankings.size());
++ for (Rankable r : unsortedForOtherRankings) {
++ otherRankings.updateWith(r);
++ }
++
++ // when
++ rankings.updateWith(otherRankings);
++
++ // then
++ assertThat(rankings.getRankings()).isEqualTo(expSorted);
++ }
++
++ @DataProvider
++ public Object[][] duplicatesData() {
++ Rankable A1 = new RankableObjectWithFields("A", 1);
++ Rankable A2 = new RankableObjectWithFields("A", 2);
++ Rankable A3 = new RankableObjectWithFields("A", 3);
++ return new Object[][]{ { Lists.newArrayList(ANY_RANKABLE, ANY_RANKABLE, ANY_RANKABLE) }, { Lists.newArrayList(A1,
++ A2, A3) }, };
++ }
++
++ @Test(dataProvider = "duplicatesData")
++ public void shouldNotRankDuplicateObjectsMoreThanOnce(List<Rankable> duplicates) {
++ // given
++ Rankings rankings = new Rankings(duplicates.size());
++
++ // when
++ for (Rankable r : duplicates) {
++ rankings.updateWith(r);
++ }
++
++ // then
++ assertThat(rankings.size()).isEqualTo(1);
++ }
++
++ @DataProvider
++ public Object[][] removeZeroRankingsData() {
++ return new Object[][]{ { Lists.newArrayList(A, ZERO), Lists.newArrayList(A) }, { Lists.newArrayList(A),
++ Lists.newArrayList(A) }, { Lists.newArrayList(ZERO, A), Lists.newArrayList(A) }, { Lists.newArrayList(ZERO),
++ Lists.newArrayList() }, { Lists.newArrayList(ZERO, new RankableObjectWithFields("ZERO2", 0)),
++ Lists.newArrayList() }, { Lists.newArrayList(B, ZERO, new RankableObjectWithFields("ZERO2", 0), D,
++ new RankableObjectWithFields("ZERO3", 0), new RankableObjectWithFields("ZERO4", 0), C), Lists.newArrayList(D, C,
++ B) }, { Lists.newArrayList(A, ZERO, B), Lists.newArrayList(B, A) } };
++ }
++
++ @Test(dataProvider = "removeZeroRankingsData")
++ public void shouldRemoveZeroCounts(List<Rankable> unsorted, List<Rankable> expSorted) {
++ // given
++ Rankings rankings = new Rankings(unsorted.size());
++ for (Rankable r : unsorted) {
++ rankings.updateWith(r);
++ }
++
++ // when
++ rankings.pruneZeroCounts();
++
++ // then
++ assertThat(rankings.getRankings()).isEqualTo(expSorted);
++ }
++
++ @Test
++ public void updatingWithNewRankablesShouldBeThreadSafe() throws InterruptedException {
++ // given
++ final List<Rankable> entries = ImmutableList.of(A, B, C, D);
++ final Rankings rankings = new Rankings(entries.size());
++
++ // We are capturing exceptions thrown in Blitzer's child threads into this data structure so that we can properly
++ // pass/fail this test. The reason is that Blitzer doesn't report exceptions, which is a known bug in Blitzer
++ // (JMOCK-263). See https://github.com/jmock-developers/jmock-library/issues/22 for more information.
++ final List<Exception> exceptions = Lists.newArrayList();
++ Blitzer blitzer = new Blitzer(1000);
++
++ // when
++ blitzer.blitz(new Runnable() {
++ public void run() {
++ for (Rankable r : entries) {
++ try {
++ rankings.updateWith(r);
++ }
++ catch (RuntimeException e) {
++ synchronized(exceptions) {
++ exceptions.add(e);
++ }
++ }
++ }
++ }
++ });
++ blitzer.shutdown();
++
++ // then
++ //
++ if (!exceptions.isEmpty()) {
++ for (Exception e : exceptions) {
++ System.err.println(Throwables.getStackTraceAsString(e));
++ }
++ }
++ assertThat(exceptions).isEmpty();
++ }
++
++ @Test(dataProvider = "copyRankingsData")
++ public void copyShouldReturnCopy(int topN, List<Rankable> rankables) {
++ // given
++ Rankings rankings = new Rankings(topN);
++ for (Rankable r : rankables) {
++ rankings.updateWith(r);
++ }
++
++ // when
++ Rankings copy = rankings.copy();
++
++ // then
++ assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
++ assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
++ }
++
++ @Test(dataProvider = "defensiveCopyRankingsData")
++ public void copyShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
++ // given
++ Rankings original = new Rankings(topN);
++ for (Rankable r : rankables) {
++ original.updateWith(r);
++ }
++ int expSize = original.size();
++ List<Rankable> expRankings = original.getRankings();
++
++ // when
++ Rankings copy = original.copy();
++ for (Rankable r : changes) {
++ copy.updateWith(r);
++ }
++ copy.pruneZeroCounts();
++
++ // then
++ assertThat(original.size()).isEqualTo(expSize);
++ assertThat(original.getRankings()).isEqualTo(expRankings);
++ }
++
++}