You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/03/20 22:22:45 UTC
[22/50] [abbrv] Reformat Java code to use 2 instead of 4 spaces (to
match Clojure style)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/SingleJoinBolt.java b/src/jvm/storm/starter/bolt/SingleJoinBolt.java
index c353b7a..e12f005 100644
--- a/src/jvm/storm/starter/bolt/SingleJoinBolt.java
+++ b/src/jvm/storm/starter/bolt/SingleJoinBolt.java
@@ -9,90 +9,89 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TimeCacheMap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+
+import java.util.*;
public class SingleJoinBolt extends BaseRichBolt {
- OutputCollector _collector;
- Fields _idFields;
- Fields _outFields;
- int _numSources;
- TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
- Map<String, GlobalStreamId> _fieldLocations;
-
- public SingleJoinBolt(Fields outFields) {
- _outFields = outFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _fieldLocations = new HashMap<String, GlobalStreamId>();
- _collector = collector;
- int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
- _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
- _numSources = context.getThisSources().size();
- Set<String> idFields = null;
- for(GlobalStreamId source: context.getThisSources().keySet()) {
- Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
- Set<String> setFields = new HashSet<String>(fields.toList());
- if(idFields==null) idFields = setFields;
- else idFields.retainAll(setFields);
-
- for(String outfield: _outFields) {
- for(String sourcefield: fields) {
- if(outfield.equals(sourcefield)) {
- _fieldLocations.put(outfield, source);
- }
- }
- }
- }
- _idFields = new Fields(new ArrayList<String>(idFields));
-
- if(_fieldLocations.size()!=_outFields.size()) {
- throw new RuntimeException("Cannot find all outfields among sources");
+ OutputCollector _collector;
+ Fields _idFields;
+ Fields _outFields;
+ int _numSources;
+ TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
+ Map<String, GlobalStreamId> _fieldLocations;
+
+ public SingleJoinBolt(Fields outFields) {
+ _outFields = outFields;
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _fieldLocations = new HashMap<String, GlobalStreamId>();
+ _collector = collector;
+ int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+ _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
+ _numSources = context.getThisSources().size();
+ Set<String> idFields = null;
+ for (GlobalStreamId source : context.getThisSources().keySet()) {
+ Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
+ Set<String> setFields = new HashSet<String>(fields.toList());
+ if (idFields == null)
+ idFields = setFields;
+ else
+ idFields.retainAll(setFields);
+
+ for (String outfield : _outFields) {
+ for (String sourcefield : fields) {
+ if (outfield.equals(sourcefield)) {
+ _fieldLocations.put(outfield, source);
+ }
}
+ }
}
+ _idFields = new Fields(new ArrayList<String>(idFields));
- @Override
- public void execute(Tuple tuple) {
- List<Object> id = tuple.select(_idFields);
- GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
- if(!_pending.containsKey(id)) {
- _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
- }
- Map<GlobalStreamId, Tuple> parts = _pending.get(id);
- if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");
- parts.put(streamId, tuple);
- if(parts.size()==_numSources) {
- _pending.remove(id);
- List<Object> joinResult = new ArrayList<Object>();
- for(String outField: _outFields) {
- GlobalStreamId loc = _fieldLocations.get(outField);
- joinResult.add(parts.get(loc).getValueByField(outField));
- }
- _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-
- for(Tuple part: parts.values()) {
- _collector.ack(part);
- }
- }
+ if (_fieldLocations.size() != _outFields.size()) {
+ throw new RuntimeException("Cannot find all outfields among sources");
+ }
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ List<Object> id = tuple.select(_idFields);
+ GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
+ if (!_pending.containsKey(id)) {
+ _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
}
+ Map<GlobalStreamId, Tuple> parts = _pending.get(id);
+ if (parts.containsKey(streamId))
+ throw new RuntimeException("Received same side of single join twice");
+ parts.put(streamId, tuple);
+ if (parts.size() == _numSources) {
+ _pending.remove(id);
+ List<Object> joinResult = new ArrayList<Object>();
+ for (String outField : _outFields) {
+ GlobalStreamId loc = _fieldLocations.get(outField);
+ joinResult.add(parts.get(loc).getValueByField(outField));
+ }
+ _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
+
+ for (Tuple part : parts.values()) {
+ _collector.ack(part);
+ }
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(_outFields);
+ }
+ private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
@Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(_outFields);
- }
-
- private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
- @Override
- public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
- for(Tuple tuple: tuples.values()) {
- _collector.fail(tuple);
- }
- }
+ public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
+ for (Tuple tuple : tuples.values()) {
+ _collector.fail(tuple);
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/TotalRankingsBolt.java b/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
index 84ff69e..846ff49 100644
--- a/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
+++ b/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
@@ -1,43 +1,41 @@
package storm.starter.bolt;
+import backtype.storm.tuple.Tuple;
import org.apache.log4j.Logger;
-
import storm.starter.tools.Rankings;
-import backtype.storm.tuple.Tuple;
/**
* This bolt merges incoming {@link Rankings}.
- *
+ * <p/>
* It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,
* consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.
- *
*/
public final class TotalRankingsBolt extends AbstractRankerBolt {
- private static final long serialVersionUID = -8447525895532302198L;
- private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
+ private static final long serialVersionUID = -8447525895532302198L;
+ private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
- public TotalRankingsBolt() {
- super();
- }
+ public TotalRankingsBolt() {
+ super();
+ }
- public TotalRankingsBolt(int topN) {
- super(topN);
- }
+ public TotalRankingsBolt(int topN) {
+ super(topN);
+ }
- public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
- super(topN, emitFrequencyInSeconds);
- }
+ public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
+ super(topN, emitFrequencyInSeconds);
+ }
- @Override
- void updateRankingsWithTuple(Tuple tuple) {
- Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
- super.getRankings().updateWith(rankingsToBeMerged);
- }
+ @Override
+ void updateRankingsWithTuple(Tuple tuple) {
+ Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
+ super.getRankings().updateWith(rankingsToBeMerged);
+ }
- @Override
- Logger getLogger() {
- return LOG;
- }
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/spout/RandomSentenceSpout.java b/src/jvm/storm/starter/spout/RandomSentenceSpout.java
index af6c151..e808f83 100644
--- a/src/jvm/storm/starter/spout/RandomSentenceSpout.java
+++ b/src/jvm/storm/starter/spout/RandomSentenceSpout.java
@@ -7,44 +7,41 @@ import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
+
import java.util.Map;
import java.util.Random;
public class RandomSentenceSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
- Random _rand;
-
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = new Random();
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- String[] sentences = new String[] {
- "the cow jumped over the moon",
- "an apple a day keeps the doctor away",
- "four score and seven years ago",
- "snow white and the seven dwarfs",
- "i am at two with nature"};
- String sentence = sentences[_rand.nextInt(sentences.length)];
- _collector.emit(new Values(sentence));
- }
-
- @Override
- public void ack(Object id) {
- }
-
- @Override
- public void fail(Object id) {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
+ SpoutOutputCollector _collector;
+ Random _rand;
+
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ _rand = new Random();
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(100);
+ String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+ "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+ String sentence = sentences[_rand.nextInt(sentences.length)];
+ _collector.emit(new Values(sentence));
+ }
+
+ @Override
+ public void ack(Object id) {
+ }
+
+ @Override
+ public void fail(Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java b/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
index 4b85cd2..92998a6 100644
--- a/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
+++ b/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
@@ -1,55 +1,53 @@
package storm.starter.tools;
-import org.apache.commons.collections.buffer.CircularFifoBuffer;
-
import backtype.storm.utils.Time;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
/**
* This class tracks the time-since-last-modify of a "thing" in a rolling fashion.
- *
+ * <p/>
* For example, create a 5-slot tracker to track the five most recent time-since-last-modify.
- *
+ * <p/>
* You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just
* been modified.
- *
*/
public class NthLastModifiedTimeTracker {
- private static final int MILLIS_IN_SEC = 1000;
+ private static final int MILLIS_IN_SEC = 1000;
- private final CircularFifoBuffer lastModifiedTimesMillis;
+ private final CircularFifoBuffer lastModifiedTimesMillis;
- public NthLastModifiedTimeTracker(int numTimesToTrack) {
- if (numTimesToTrack < 1) {
- throw new IllegalArgumentException("numTimesToTrack must be greater than zero (you requested "
- + numTimesToTrack + ")");
- }
- lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
- initLastModifiedTimesMillis();
+ public NthLastModifiedTimeTracker(int numTimesToTrack) {
+ if (numTimesToTrack < 1) {
+ throw new IllegalArgumentException(
+ "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
}
-
- private void initLastModifiedTimesMillis() {
- long nowCached = now();
- for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
- lastModifiedTimesMillis.add(Long.valueOf(nowCached));
- }
+ lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
+ initLastModifiedTimesMillis();
+ }
+
+ private void initLastModifiedTimesMillis() {
+ long nowCached = now();
+ for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
+ lastModifiedTimesMillis.add(Long.valueOf(nowCached));
}
+ }
- private long now() {
- return Time.currentTimeMillis();
- }
+ private long now() {
+ return Time.currentTimeMillis();
+ }
- public int secondsSinceOldestModification() {
- long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
- return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
- }
+ public int secondsSinceOldestModification() {
+ long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
+ return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
+ }
- public void markAsModified() {
- updateLastModifiedTime();
- }
+ public void markAsModified() {
+ updateLastModifiedTime();
+ }
- private void updateLastModifiedTime() {
- lastModifiedTimesMillis.add(now());
- }
+ private void updateLastModifiedTime() {
+ lastModifiedTimesMillis.add(now());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankable.java b/src/jvm/storm/starter/tools/Rankable.java
index 03f742f..36ba086 100644
--- a/src/jvm/storm/starter/tools/Rankable.java
+++ b/src/jvm/storm/starter/tools/Rankable.java
@@ -2,8 +2,8 @@ package storm.starter.tools;
public interface Rankable extends Comparable<Rankable> {
- Object getObject();
+ Object getObject();
- long getCount();
+ long getCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/RankableObjectWithFields.java b/src/jvm/storm/starter/tools/RankableObjectWithFields.java
index 38db6e9..1fdad37 100644
--- a/src/jvm/storm/starter/tools/RankableObjectWithFields.java
+++ b/src/jvm/storm/starter/tools/RankableObjectWithFields.java
@@ -1,119 +1,118 @@
package storm.starter.tools;
-import java.io.Serializable;
-import java.util.List;
-
import backtype.storm.tuple.Tuple;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.List;
+
/**
* This class wraps an objects and its associated count, including any additional data fields.
- *
+ * <p/>
* This class can be used, for instance, to track the number of occurrences of an object in a Storm topology.
- *
*/
public class RankableObjectWithFields implements Rankable, Serializable {
- private static final long serialVersionUID = -9102878650001058090L;
- private static final String toStringSeparator = "|";
-
- private final Object obj;
- private final long count;
- private final ImmutableList<Object> fields;
+ private static final long serialVersionUID = -9102878650001058090L;
+ private static final String toStringSeparator = "|";
- public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
- if (obj == null) {
- throw new IllegalArgumentException("The object must not be null");
- }
- if (count < 0) {
- throw new IllegalArgumentException("The count must be >= 0");
- }
- this.obj = obj;
- this.count = count;
- fields = ImmutableList.copyOf(otherFields);
+ private final Object obj;
+ private final long count;
+ private final ImmutableList<Object> fields;
+ public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
+ if (obj == null) {
+ throw new IllegalArgumentException("The object must not be null");
}
-
- /**
- * Construct a new instance based on the provided {@link Tuple}.
- *
- * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of
- * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be
- * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.
- *
- * @param tuple
- * @return
- */
- public static RankableObjectWithFields from(Tuple tuple) {
- List<Object> otherFields = Lists.newArrayList(tuple.getValues());
- Object obj = otherFields.remove(0);
- Long count = (Long) otherFields.remove(0);
- return new RankableObjectWithFields(obj, count, otherFields.toArray());
- }
-
- public Object getObject() {
- return obj;
+ if (count < 0) {
+ throw new IllegalArgumentException("The count must be >= 0");
}
-
- public long getCount() {
- return count;
+ this.obj = obj;
+ this.count = count;
+ fields = ImmutableList.copyOf(otherFields);
+
+ }
+
+ /**
+ * Construct a new instance based on the provided {@link Tuple}.
+ * <p/>
+ * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of
+ * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be
+ * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.
+ *
+ * @param tuple
+ *
+ * @return
+ */
+ public static RankableObjectWithFields from(Tuple tuple) {
+ List<Object> otherFields = Lists.newArrayList(tuple.getValues());
+ Object obj = otherFields.remove(0);
+ Long count = (Long) otherFields.remove(0);
+ return new RankableObjectWithFields(obj, count, otherFields.toArray());
+ }
+
+ public Object getObject() {
+ return obj;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * @return an immutable list of any additional data fields of the object (may be empty but will never be null)
+ */
+ public List<Object> getFields() {
+ return fields;
+ }
+
+ @Override
+ public int compareTo(Rankable other) {
+ long delta = this.getCount() - other.getCount();
+ if (delta > 0) {
+ return 1;
}
-
- /**
- * @return an immutable list of any additional data fields of the object (may be empty but will never be null)
- */
- public List<Object> getFields() {
- return fields;
+ else if (delta < 0) {
+ return -1;
}
-
- @Override
- public int compareTo(Rankable other) {
- long delta = this.getCount() - other.getCount();
- if (delta > 0) {
- return 1;
- }
- else if (delta < 0) {
- return -1;
- }
- else {
- return 0;
- }
+ else {
+ return 0;
}
+ }
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof RankableObjectWithFields)) {
- return false;
- }
- RankableObjectWithFields other = (RankableObjectWithFields) o;
- return obj.equals(other.obj) && count == other.count;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
-
- @Override
- public int hashCode() {
- int result = 17;
- int countHash = (int) (count ^ (count >>> 32));
- result = 31 * result + countHash;
- result = 31 * result + obj.hashCode();
- return result;
+ if (!(o instanceof RankableObjectWithFields)) {
+ return false;
}
-
- public String toString() {
- StringBuffer buf = new StringBuffer();
- buf.append("[");
- buf.append(obj);
- buf.append(toStringSeparator);
- buf.append(count);
- for (Object field : fields) {
- buf.append(toStringSeparator);
- buf.append(field);
- }
- buf.append("]");
- return buf.toString();
+ RankableObjectWithFields other = (RankableObjectWithFields) o;
+ return obj.equals(other.obj) && count == other.count;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ int countHash = (int) (count ^ (count >>> 32));
+ result = 31 * result + countHash;
+ result = 31 * result + obj.hashCode();
+ return result;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("[");
+ buf.append(obj);
+ buf.append(toStringSeparator);
+ buf.append(count);
+ for (Object field : fields) {
+ buf.append(toStringSeparator);
+ buf.append(field);
}
+ buf.append("]");
+ return buf.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankings.java b/src/jvm/storm/starter/tools/Rankings.java
index d6fd34c..a3f421a 100644
--- a/src/jvm/storm/starter/tools/Rankings.java
+++ b/src/jvm/storm/starter/tools/Rankings.java
@@ -1,95 +1,95 @@
package storm.starter.tools;
+import com.google.common.collect.Lists;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
-import com.google.common.collect.Lists;
-
public class Rankings implements Serializable {
- private static final long serialVersionUID = -1549827195410578903L;
- private static final int DEFAULT_COUNT = 10;
-
- private final int maxSize;
- private final List<Rankable> rankedItems = Lists.newArrayList();
-
- public Rankings() {
- this(DEFAULT_COUNT);
- }
+ private static final long serialVersionUID = -1549827195410578903L;
+ private static final int DEFAULT_COUNT = 10;
- public Rankings(int topN) {
- if (topN < 1) {
- throw new IllegalArgumentException("topN must be >= 1");
- }
- maxSize = topN;
- }
+ private final int maxSize;
+ private final List<Rankable> rankedItems = Lists.newArrayList();
- /**
- * @return the maximum possible number (size) of ranked objects this instance can hold
- */
- public int maxSize() {
- return maxSize;
- }
+ public Rankings() {
+ this(DEFAULT_COUNT);
+ }
- /**
- * @return the number (size) of ranked objects this instance is currently holding
- */
- public int size() {
- return rankedItems.size();
+ public Rankings(int topN) {
+ if (topN < 1) {
+ throw new IllegalArgumentException("topN must be >= 1");
}
-
- public List<Rankable> getRankings() {
- return Lists.newArrayList(rankedItems);
+ maxSize = topN;
+ }
+
+ /**
+ * @return the maximum possible number (size) of ranked objects this instance can hold
+ */
+ public int maxSize() {
+ return maxSize;
+ }
+
+ /**
+ * @return the number (size) of ranked objects this instance is currently holding
+ */
+ public int size() {
+ return rankedItems.size();
+ }
+
+ public List<Rankable> getRankings() {
+ return Lists.newArrayList(rankedItems);
+ }
+
+ public void updateWith(Rankings other) {
+ for (Rankable r : other.getRankings()) {
+ updateWith(r);
}
+ }
- public void updateWith(Rankings other) {
- for (Rankable r : other.getRankings()) {
- updateWith(r);
- }
+ public void updateWith(Rankable r) {
+ synchronized(rankedItems) {
+ addOrReplace(r);
+ rerank();
+ shrinkRankingsIfNeeded();
}
+ }
- public void updateWith(Rankable r) {
- synchronized(rankedItems) {
- addOrReplace(r);
- rerank();
- shrinkRankingsIfNeeded();
- }
+ private void addOrReplace(Rankable r) {
+ Integer rank = findRankOf(r);
+ if (rank != null) {
+ rankedItems.set(rank, r);
}
-
- private void addOrReplace(Rankable r) {
- Integer rank = findRankOf(r);
- if (rank != null) {
- rankedItems.set(rank, r);
- }
- else {
- rankedItems.add(r);
- }
+ else {
+ rankedItems.add(r);
}
-
- private Integer findRankOf(Rankable r) {
- Object tag = r.getObject();
- for (int rank = 0; rank < rankedItems.size(); rank++) {
- Object cur = rankedItems.get(rank).getObject();
- if (cur.equals(tag)) {
- return rank;
- }
- }
- return null;
+ }
+
+ private Integer findRankOf(Rankable r) {
+ Object tag = r.getObject();
+ for (int rank = 0; rank < rankedItems.size(); rank++) {
+ Object cur = rankedItems.get(rank).getObject();
+ if (cur.equals(tag)) {
+ return rank;
+ }
}
+ return null;
+ }
- private void rerank() {
- Collections.sort(rankedItems);
- Collections.reverse(rankedItems);
- }
+ private void rerank() {
+ Collections.sort(rankedItems);
+ Collections.reverse(rankedItems);
+ }
- private void shrinkRankingsIfNeeded() {
- if (rankedItems.size() > maxSize) {
- rankedItems.remove(maxSize);
- }
+ private void shrinkRankingsIfNeeded() {
+ if (rankedItems.size() > maxSize) {
+ rankedItems.remove(maxSize);
}
+ }
- public String toString() {
- return rankedItems.toString();
- }
+ public String toString() {
+ return rankedItems.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/SlidingWindowCounter.java b/src/jvm/storm/starter/tools/SlidingWindowCounter.java
index b20af00..dff6a59 100644
--- a/src/jvm/storm/starter/tools/SlidingWindowCounter.java
+++ b/src/jvm/storm/starter/tools/SlidingWindowCounter.java
@@ -5,99 +5,98 @@ import java.util.Map;
/**
* This class counts objects in a sliding window fashion.
- *
+ * <p/>
* It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment
* counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access
* to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the
* sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads
* will go to. Also, by itself this class will not advance the head slot.
- *
+ * <p/>
* A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code>
* iterations, this sliding window counter will always return object counts that are equal or greater than in the
* previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually,
* this is the desired behavior.
- *
+ * <p/>
* To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each:
- *
+ * <p/>
* <pre>
* {@code
* Sliding window counts of an object X over time
- *
+ *
* Minute (timeline):
* 1 2 3 4 5 6 7 8
- *
+ *
* Observed counts per minute:
* 1 1 1 1 0 0 0 0
- *
+ *
* Counts returned by counter:
* 1 2 3 4 4 3 2 1
* }
* </pre>
- *
+ * <p/>
* As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the
* counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load
* effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your
* analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously
* increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing
* counts.
- *
+ * <p/>
* On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes,
- * "how often did you count the object during the past five minutes?", then it should reply
- * "I have counted it 2 times in the past five minutes", implying that it can only account for the last two of those
- * five minutes because the counter was not running before that time.
- *
- * @param <T>
- * The type of those objects we want to count.
+ * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times
+ * in the past five minutes", implying that it can only account for the last two of those five minutes because the
+ * counter was not running before that time.
+ *
+ * @param <T> The type of those objects we want to count.
*/
public final class SlidingWindowCounter<T> implements Serializable {
- private static final long serialVersionUID = -2645063988768785810L;
+ private static final long serialVersionUID = -2645063988768785810L;
- private SlotBasedCounter<T> objCounter;
- private int headSlot;
- private int tailSlot;
- private int windowLengthInSlots;
+ private SlotBasedCounter<T> objCounter;
+ private int headSlot;
+ private int tailSlot;
+ private int windowLengthInSlots;
- public SlidingWindowCounter(int windowLengthInSlots) {
- if (windowLengthInSlots < 2) {
- throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
- + windowLengthInSlots + ")");
- }
- this.windowLengthInSlots = windowLengthInSlots;
- this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
-
- this.headSlot = 0;
- this.tailSlot = slotAfter(headSlot);
+ public SlidingWindowCounter(int windowLengthInSlots) {
+ if (windowLengthInSlots < 2) {
+ throw new IllegalArgumentException(
+ "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")");
}
+ this.windowLengthInSlots = windowLengthInSlots;
+ this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
- public void incrementCount(T obj) {
- objCounter.incrementCount(obj, headSlot);
- }
+ this.headSlot = 0;
+ this.tailSlot = slotAfter(headSlot);
+ }
- /**
- * Return the current (total) counts of all tracked objects, then advance the window.
- *
- * Whenever this method is called, we consider the counts of the current sliding window to be available to and
- * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
- * objects within the next "chunk" of the sliding window.
- *
- * @return
- */
- public Map<T, Long> getCountsThenAdvanceWindow() {
- Map<T, Long> counts = objCounter.getCounts();
- objCounter.wipeZeros();
- objCounter.wipeSlot(tailSlot);
- advanceHead();
- return counts;
- }
+ public void incrementCount(T obj) {
+ objCounter.incrementCount(obj, headSlot);
+ }
- private void advanceHead() {
- headSlot = tailSlot;
- tailSlot = slotAfter(tailSlot);
- }
+ /**
+ * Return the current (total) counts of all tracked objects, then advance the window.
+ * <p/>
+ * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+ * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+ * objects within the next "chunk" of the sliding window.
+ *
+ * @return
+ */
+ public Map<T, Long> getCountsThenAdvanceWindow() {
+ Map<T, Long> counts = objCounter.getCounts();
+ objCounter.wipeZeros();
+ objCounter.wipeSlot(tailSlot);
+ advanceHead();
+ return counts;
+ }
- private int slotAfter(int slot) {
- return (slot + 1) % windowLengthInSlots;
- }
+ private void advanceHead() {
+ headSlot = tailSlot;
+ tailSlot = slotAfter(tailSlot);
+ }
+
+ private int slotAfter(int slot) {
+ return (slot + 1) % windowLengthInSlots;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/SlotBasedCounter.java b/src/jvm/storm/starter/tools/SlotBasedCounter.java
index 5624a54..f45ef48 100644
--- a/src/jvm/storm/starter/tools/SlotBasedCounter.java
+++ b/src/jvm/storm/starter/tools/SlotBasedCounter.java
@@ -8,96 +8,94 @@ import java.util.Set;
/**
* This class provides per-slot counts of the occurrences of objects.
- *
+ * <p/>
* It can be used, for instance, as a building block for implementing sliding window counting of objects.
- *
- * @param <T>
- * The type of those objects we want to count.
+ *
+ * @param <T> The type of those objects we want to count.
*/
public final class SlotBasedCounter<T> implements Serializable {
- private static final long serialVersionUID = 4858185737378394432L;
+ private static final long serialVersionUID = 4858185737378394432L;
- private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
- private final int numSlots;
+ private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
+ private final int numSlots;
- public SlotBasedCounter(int numSlots) {
- if (numSlots <= 0) {
- throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
- + ")");
- }
- this.numSlots = numSlots;
+ public SlotBasedCounter(int numSlots) {
+ if (numSlots <= 0) {
+ throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")");
}
+ this.numSlots = numSlots;
+ }
- public void incrementCount(T obj, int slot) {
- long[] counts = objToCounts.get(obj);
- if (counts == null) {
- counts = new long[this.numSlots];
- objToCounts.put(obj, counts);
- }
- counts[slot]++;
+ public void incrementCount(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ counts = new long[this.numSlots];
+ objToCounts.put(obj, counts);
}
+ counts[slot]++;
+ }
- public long getCount(T obj, int slot) {
- long[] counts = objToCounts.get(obj);
- if (counts == null) {
- return 0;
- }
- else {
- return counts[slot];
- }
+ public long getCount(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ if (counts == null) {
+ return 0;
}
-
- public Map<T, Long> getCounts() {
- Map<T, Long> result = new HashMap<T, Long>();
- for (T obj : objToCounts.keySet()) {
- result.put(obj, computeTotalCount(obj));
- }
- return result;
+ else {
+ return counts[slot];
}
+ }
- private long computeTotalCount(T obj) {
- long[] curr = objToCounts.get(obj);
- long total = 0;
- for (long l : curr) {
- total += l;
- }
- return total;
+ public Map<T, Long> getCounts() {
+ Map<T, Long> result = new HashMap<T, Long>();
+ for (T obj : objToCounts.keySet()) {
+ result.put(obj, computeTotalCount(obj));
}
+ return result;
+ }
- /**
- * Reset the slot count of any tracked objects to zero for the given slot.
- *
- * @param slot
- */
- public void wipeSlot(int slot) {
- for (T obj : objToCounts.keySet()) {
- resetSlotCountToZero(obj, slot);
- }
+ private long computeTotalCount(T obj) {
+ long[] curr = objToCounts.get(obj);
+ long total = 0;
+ for (long l : curr) {
+ total += l;
}
+ return total;
+ }
- private void resetSlotCountToZero(T obj, int slot) {
- long[] counts = objToCounts.get(obj);
- counts[slot] = 0;
+ /**
+ * Reset the slot count of any tracked objects to zero for the given slot.
+ *
+ * @param slot
+ */
+ public void wipeSlot(int slot) {
+ for (T obj : objToCounts.keySet()) {
+ resetSlotCountToZero(obj, slot);
}
+ }
- private boolean shouldBeRemovedFromCounter(T obj) {
- return computeTotalCount(obj) == 0;
- }
+ private void resetSlotCountToZero(T obj, int slot) {
+ long[] counts = objToCounts.get(obj);
+ counts[slot] = 0;
+ }
- /**
- * Remove any object from the counter whose total count is zero (to free up memory).
- */
- public void wipeZeros() {
- Set<T> objToBeRemoved = new HashSet<T>();
- for (T obj : objToCounts.keySet()) {
- if (shouldBeRemovedFromCounter(obj)) {
- objToBeRemoved.add(obj);
- }
- }
- for (T obj : objToBeRemoved) {
- objToCounts.remove(obj);
- }
+ private boolean shouldBeRemovedFromCounter(T obj) {
+ return computeTotalCount(obj) == 0;
+ }
+
+ /**
+ * Remove any object from the counter whose total count is zero (to free up memory).
+ */
+ public void wipeZeros() {
+ Set<T> objToBeRemoved = new HashSet<T>();
+ for (T obj : objToCounts.keySet()) {
+ if (shouldBeRemovedFromCounter(obj)) {
+ objToBeRemoved.add(obj);
+ }
+ }
+ for (T obj : objToBeRemoved) {
+ objToCounts.remove(obj);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/trident/TridentReach.java b/src/jvm/storm/starter/trident/TridentReach.java
index ec10148..a940b40 100644
--- a/src/jvm/storm/starter/trident/TridentReach.java
+++ b/src/jvm/storm/starter/trident/TridentReach.java
@@ -3,15 +3,10 @@ package storm.starter.trident;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
import backtype.storm.generated.StormTopology;
import backtype.storm.task.IMetricsContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
@@ -25,126 +20,120 @@ import storm.trident.state.StateFactory;
import storm.trident.state.map.ReadOnlyMapState;
import storm.trident.tuple.TridentTuple;
+import java.util.*;
+
public class TridentReach {
- public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
- put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
- put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
- put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
- }};
-
- public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
- put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
- put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
- put("tim", Arrays.asList("alex"));
- put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
- put("adam", Arrays.asList("david", "carissa"));
- put("mike", Arrays.asList("john", "bob"));
- put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
- }};
-
- public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
- public static class Factory implements StateFactory {
- Map _map;
-
- public Factory(Map map) {
- _map = map;
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- return new StaticSingleKeyMapState(_map);
- }
+ public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+ put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+ put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+ }};
+
+ public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+ put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+ put("tim", Arrays.asList("alex"));
+ put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+ put("adam", Arrays.asList("david", "carissa"));
+ put("mike", Arrays.asList("john", "bob"));
+ put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+ }};
+
+ public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
+ public static class Factory implements StateFactory {
+ Map _map;
+
+ public Factory(Map map) {
+ _map = map;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ return new StaticSingleKeyMapState(_map);
+ }
- }
+ }
- Map _map;
+ Map _map;
- public StaticSingleKeyMapState(Map map) {
- _map = map;
- }
+ public StaticSingleKeyMapState(Map map) {
+ _map = map;
+ }
- @Override
- public List<Object> multiGet(List<List<Object>> keys) {
- List<Object> ret = new ArrayList();
- for(List<Object> key: keys) {
- Object singleKey = key.get(0);
- ret.add(_map.get(singleKey));
- }
- return ret;
- }
+ @Override
+ public List<Object> multiGet(List<List<Object>> keys) {
+ List<Object> ret = new ArrayList();
+ for (List<Object> key : keys) {
+ Object singleKey = key.get(0);
+ ret.add(_map.get(singleKey));
+ }
+ return ret;
+ }
+ }
+
+ public static class One implements CombinerAggregator<Integer> {
+ @Override
+ public Integer init(TridentTuple tuple) {
+ return 1;
}
- public static class One implements CombinerAggregator<Integer> {
- @Override
- public Integer init(TridentTuple tuple) {
- return 1;
- }
+ @Override
+ public Integer combine(Integer val1, Integer val2) {
+ return 1;
+ }
- @Override
- public Integer combine(Integer val1, Integer val2) {
- return 1;
- }
+ @Override
+ public Integer zero() {
+ return 1;
+ }
+ }
- @Override
- public Integer zero() {
- return 1;
+ public static class ExpandList extends BaseFunction {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ List l = (List) tuple.getValue(0);
+ if (l != null) {
+ for (Object o : l) {
+ collector.emit(new Values(o));
}
+ }
}
- public static class ExpandList extends BaseFunction {
+ }
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- List l = (List) tuple.getValue(0);
- if(l!=null) {
- for(Object o: l) {
- collector.emit(new Values(o));
- }
- }
- }
+ public static StormTopology buildTopology(LocalDRPC drpc) {
+ TridentTopology topology = new TridentTopology();
+ TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
+ TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
- }
- public static StormTopology buildTopology(LocalDRPC drpc) {
- TridentTopology topology = new TridentTopology();
- TridentState urlToTweeters =
- topology.newStaticState(
- new StaticSingleKeyMapState.Factory(TWEETERS_DB));
- TridentState tweetersToFollowers =
- topology.newStaticState(
- new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
-
-
- topology.newDRPCStream("reach", drpc)
- .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
- .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
- .shuffle()
- .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
- .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
- .groupBy(new Fields("follower"))
- .aggregate(new One(), new Fields("one"))
- .aggregate(new Fields("one"), new Sum(), new Fields("reach"));
- return topology.build();
- }
+ topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
+ "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
+ tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
+ new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
+ "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
+ return topology.build();
+ }
- public static void main(String[] args) throws Exception {
- LocalDRPC drpc = new LocalDRPC();
+ public static void main(String[] args) throws Exception {
+ LocalDRPC drpc = new LocalDRPC();
- Config conf = new Config();
- LocalCluster cluster = new LocalCluster();
+ Config conf = new Config();
+ LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("reach", conf, buildTopology(drpc));
+ cluster.submitTopology("reach", conf, buildTopology(drpc));
- Thread.sleep(2000);
+ Thread.sleep(2000);
- System.out.println("REACH: " + drpc.execute("reach", "aaa"));
- System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
- System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+ System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+ System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+ System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
- cluster.shutdown();
- drpc.shutdown();
- }
+ cluster.shutdown();
+ drpc.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/trident/TridentWordCount.java b/src/jvm/storm/starter/trident/TridentWordCount.java
index 268d41c..c56280c 100644
--- a/src/jvm/storm/starter/trident/TridentWordCount.java
+++ b/src/jvm/storm/starter/trident/TridentWordCount.java
@@ -1,6 +1,5 @@
package storm.starter.trident;
-import storm.trident.testing.FixedBatchSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
@@ -16,65 +15,54 @@ import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
-import storm.trident.planner.processor.StateQueryProcessor;
+import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;
-public class TridentWordCount {
- public static class Split extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- String sentence = tuple.getString(0);
- for(String word: sentence.split(" ")) {
- collector.emit(new Values(word));
- }
- }
+public class TridentWordCount {
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
}
-
- public static StormTopology buildTopology(LocalDRPC drpc) {
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
- new Values("the cow jumped over the moon"),
- new Values("the man went to the store and bought some candy"),
- new Values("four score and seven years ago"),
- new Values("how many apples can you eat"),
- new Values("to be or not to be the person"));
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- TridentState wordCounts =
- topology.newStream("spout1", spout)
- .parallelismHint(16)
- .each(new Fields("sentence"), new Split(), new Fields("word"))
- .groupBy(new Fields("word"))
- .persistentAggregate(new MemoryMapState.Factory(),
- new Count(), new Fields("count"))
- .parallelismHint(16);
-
- topology.newDRPCStream("words", drpc)
- .each(new Fields("args"), new Split(), new Fields("word"))
- .groupBy(new Fields("word"))
- .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
- .each(new Fields("count"), new FilterNull())
- .aggregate(new Fields("count"), new Sum(), new Fields("sum"))
- ;
- return topology.build();
+ }
+
+ public static StormTopology buildTopology(LocalDRPC drpc) {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+ new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
+ new Count(), new Fields("count")).parallelismHint(16);
+
+ topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
+ "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
+ new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ if (args.length == 0) {
+ LocalDRPC drpc = new LocalDRPC();
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
+ for (int i = 0; i < 100; i++) {
+ System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+ Thread.sleep(1000);
+ }
}
-
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- conf.setMaxSpoutPending(20);
- if(args.length==0) {
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
- for(int i=0; i<100; i++) {
- System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
- Thread.sleep(1000);
- }
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
- }
+ else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/util/StormRunner.java b/src/jvm/storm/starter/util/StormRunner.java
index 09b4062..30c4f50 100644
--- a/src/jvm/storm/starter/util/StormRunner.java
+++ b/src/jvm/storm/starter/util/StormRunner.java
@@ -6,17 +6,17 @@ import backtype.storm.generated.StormTopology;
public final class StormRunner {
- private static final int MILLIS_IN_SEC = 1000;
+ private static final int MILLIS_IN_SEC = 1000;
- private StormRunner() {
- }
+ private StormRunner() {
+ }
- public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
- throws InterruptedException {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topologyName, conf, topology);
- Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
- cluster.killTopology(topologyName);
- cluster.shutdown();
- }
+ public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
+ throws InterruptedException {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(topologyName, conf, topology);
+ Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
+ cluster.killTopology(topologyName);
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/util/TupleHelpers.java b/src/jvm/storm/starter/util/TupleHelpers.java
index 58693d7..a46480e 100644
--- a/src/jvm/storm/starter/util/TupleHelpers.java
+++ b/src/jvm/storm/starter/util/TupleHelpers.java
@@ -5,12 +5,12 @@ import backtype.storm.tuple.Tuple;
public final class TupleHelpers {
- private TupleHelpers() {
- }
+ private TupleHelpers() {
+ }
- public static boolean isTickTuple(Tuple tuple) {
- return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
- && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
- }
+ public static boolean isTickTuple(Tuple tuple) {
+ return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
+ Constants.SYSTEM_TICK_STREAM_ID);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java b/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
index c51d081..cbbee3c 100644
--- a/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
+++ b/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
@@ -1,135 +1,129 @@
package storm.starter.bolt;
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import storm.starter.tools.MockTupleHelpers;
import backtype.storm.Config;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-
import com.google.common.collect.Lists;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import storm.starter.tools.MockTupleHelpers;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
public class IntermediateRankingsBoltTest {
- private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
- private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
- private static final Object ANY_OBJECT = new Object();
- private static final int ANY_TOPN = 10;
- private static final long ANY_COUNT = 42;
-
- private Tuple mockRankableTuple(Object obj, long count) {
- Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
- when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
- return tuple;
- }
-
- @DataProvider
- public Object[][] illegalTopN() {
- return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
- public void negativeOrZeroTopNShouldThrowIAE(int topN) {
- new IntermediateRankingsBolt(topN);
- }
-
- @DataProvider
- public Object[][] illegalEmitFrequency() {
- return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
- public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
- new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
- }
-
- @DataProvider
- public Object[][] legalTopN() {
- return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
- }
-
- @Test(dataProvider = "legalTopN")
- public void positiveTopNShouldBeOk(int topN) {
- new IntermediateRankingsBolt(topN);
- }
-
- @DataProvider
- public Object[][] legalEmitFrequency() {
- return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
- }
-
- @Test(dataProvider = "legalEmitFrequency")
- public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
- new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
- }
-
- @Test
- public void shouldEmitSomethingIfTickTupleIsReceived() {
- // given
- Tuple tickTuple = MockTupleHelpers.mockTickTuple();
- BasicOutputCollector collector = mock(BasicOutputCollector.class);
- IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
- // when
- bolt.execute(tickTuple, collector);
-
- // then
- // verifyZeroInteractions(collector);
- verify(collector).emit(any(Values.class));
- }
-
- @Test
- public void shouldEmitNothingIfNormalTupleIsReceived() {
- // given
- Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
- BasicOutputCollector collector = mock(BasicOutputCollector.class);
- IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
- // when
- bolt.execute(normalTuple, collector);
-
- // then
- verifyZeroInteractions(collector);
- }
-
- @Test
- public void shouldDeclareOutputFields() {
- // given
- OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
- IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
- // when
- bolt.declareOutputFields(declarer);
-
- // then
- verify(declarer, times(1)).declare(any(Fields.class));
- }
-
- @Test
- public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
- // given
- IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
- // when
- Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
- // then
- assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- assertThat(emitFrequencyInSeconds).isGreaterThan(0);
- }
+ private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
+ private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
+ private static final Object ANY_OBJECT = new Object();
+ private static final int ANY_TOPN = 10;
+ private static final long ANY_COUNT = 42;
+
+ private Tuple mockRankableTuple(Object obj, long count) {
+ Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
+ when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
+ return tuple;
+ }
+
+ @DataProvider
+ public Object[][] illegalTopN() {
+ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
+ public void negativeOrZeroTopNShouldThrowIAE(int topN) {
+ new IntermediateRankingsBolt(topN);
+ }
+
+ @DataProvider
+ public Object[][] illegalEmitFrequency() {
+ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
+ public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
+ new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+ }
+
+ @DataProvider
+ public Object[][] legalTopN() {
+ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+ }
+
+ @Test(dataProvider = "legalTopN")
+ public void positiveTopNShouldBeOk(int topN) {
+ new IntermediateRankingsBolt(topN);
+ }
+
+ @DataProvider
+ public Object[][] legalEmitFrequency() {
+ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+ }
+
+ @Test(dataProvider = "legalEmitFrequency")
+ public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
+ new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+ }
+
+ @Test
+ public void shouldEmitSomethingIfTickTupleIsReceived() {
+ // given
+ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+ BasicOutputCollector collector = mock(BasicOutputCollector.class);
+ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+ // when
+ bolt.execute(tickTuple, collector);
+
+ // then
+ // verifyZeroInteractions(collector);
+ verify(collector).emit(any(Values.class));
+ }
+
+ @Test
+ public void shouldEmitNothingIfNormalTupleIsReceived() {
+ // given
+ Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
+ BasicOutputCollector collector = mock(BasicOutputCollector.class);
+ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+ // when
+ bolt.execute(normalTuple, collector);
+
+ // then
+ verifyZeroInteractions(collector);
+ }
+
+ @Test
+ public void shouldDeclareOutputFields() {
+ // given
+ OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+ // when
+ bolt.declareOutputFields(declarer);
+
+ // then
+ verify(declarer, times(1)).declare(any(Fields.class));
+ }
+
+ @Test
+ public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+ // given
+ IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+ // when
+ Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+ // then
+ assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/bolt/RollingCountBoltTest.java b/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
index ae19041..c9a516a 100644
--- a/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
+++ b/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
@@ -1,18 +1,5 @@
package storm.starter.bolt;
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
-import org.testng.annotations.Test;
-
-import storm.starter.tools.MockTupleHelpers;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
@@ -20,82 +7,90 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import org.testng.annotations.Test;
+import storm.starter.tools.MockTupleHelpers;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
public class RollingCountBoltTest {
- private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
- private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-
- private Tuple mockNormalTuple(Object obj) {
- Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
- when(tuple.getValue(0)).thenReturn(obj);
- return tuple;
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
- // given
- Tuple tickTuple = MockTupleHelpers.mockTickTuple();
- RollingCountBolt bolt = new RollingCountBolt();
- Map conf = mock(Map.class);
- TopologyContext context = mock(TopologyContext.class);
- OutputCollector collector = mock(OutputCollector.class);
- bolt.prepare(conf, context, collector);
-
- // when
- bolt.execute(tickTuple);
-
- // then
- verifyZeroInteractions(collector);
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
- // given
- Tuple normalTuple = mockNormalTuple(new Object());
- Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-
- RollingCountBolt bolt = new RollingCountBolt();
- Map conf = mock(Map.class);
- TopologyContext context = mock(TopologyContext.class);
- OutputCollector collector = mock(OutputCollector.class);
- bolt.prepare(conf, context, collector);
-
- // when
- bolt.execute(normalTuple);
- bolt.execute(tickTuple);
-
- // then
- verify(collector).emit(any(Values.class));
- }
-
- @Test
- public void shouldDeclareOutputFields() {
- // given
- OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
- RollingCountBolt bolt = new RollingCountBolt();
-
- // when
- bolt.declareOutputFields(declarer);
-
- // then
- verify(declarer, times(1)).declare(any(Fields.class));
-
- }
-
- @Test
- public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
- // given
- RollingCountBolt bolt = new RollingCountBolt();
-
- // when
- Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
- // then
- assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- assertThat(emitFrequencyInSeconds).isGreaterThan(0);
- }
+ private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
+ private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
+
+ private Tuple mockNormalTuple(Object obj) {
+ Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
+ when(tuple.getValue(0)).thenReturn(obj);
+ return tuple;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
+ // given
+ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+ RollingCountBolt bolt = new RollingCountBolt();
+ Map conf = mock(Map.class);
+ TopologyContext context = mock(TopologyContext.class);
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(conf, context, collector);
+
+ // when
+ bolt.execute(tickTuple);
+
+ // then
+ verifyZeroInteractions(collector);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
+ // given
+ Tuple normalTuple = mockNormalTuple(new Object());
+ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+
+ RollingCountBolt bolt = new RollingCountBolt();
+ Map conf = mock(Map.class);
+ TopologyContext context = mock(TopologyContext.class);
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(conf, context, collector);
+
+ // when
+ bolt.execute(normalTuple);
+ bolt.execute(tickTuple);
+
+ // then
+ verify(collector).emit(any(Values.class));
+ }
+
+ @Test
+ public void shouldDeclareOutputFields() {
+ // given
+ OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+ RollingCountBolt bolt = new RollingCountBolt();
+
+ // when
+ bolt.declareOutputFields(declarer);
+
+ // then
+ verify(declarer, times(1)).declare(any(Fields.class));
+
+ }
+
+ @Test
+ public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+ // given
+ RollingCountBolt bolt = new RollingCountBolt();
+
+ // when
+ Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+ // then
+ assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java b/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
index 69f5319..a8bed45 100644
--- a/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
+++ b/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
@@ -1,135 +1,130 @@
package storm.starter.bolt;
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import storm.starter.tools.MockTupleHelpers;
-import storm.starter.tools.Rankings;
import backtype.storm.Config;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import storm.starter.tools.MockTupleHelpers;
+import storm.starter.tools.Rankings;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
public class TotalRankingsBoltTest {
- private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
- private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
- private static final Object ANY_OBJECT = new Object();
- private static final int ANY_TOPN = 10;
- private static final long ANY_COUNT = 42;
-
- private Tuple mockRankingsTuple(Object obj, long count) {
- Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
- Rankings rankings = mock(Rankings.class);
- when(tuple.getValue(0)).thenReturn(rankings);
- return tuple;
- }
-
- @DataProvider
- public Object[][] illegalTopN() {
- return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
- public void negativeOrZeroTopNShouldThrowIAE(int topN) {
- new TotalRankingsBolt(topN);
- }
-
- @DataProvider
- public Object[][] illegalEmitFrequency() {
- return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
- public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
- new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
- }
-
- @DataProvider
- public Object[][] legalTopN() {
- return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
- }
-
- @Test(dataProvider = "legalTopN")
- public void positiveTopNShouldBeOk(int topN) {
- new TotalRankingsBolt(topN);
- }
-
- @DataProvider
- public Object[][] legalEmitFrequency() {
- return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
- }
-
- @Test(dataProvider = "legalEmitFrequency")
- public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
- new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
- }
-
- @Test
- public void shouldEmitSomethingIfTickTupleIsReceived() {
- // given
- Tuple tickTuple = MockTupleHelpers.mockTickTuple();
- BasicOutputCollector collector = mock(BasicOutputCollector.class);
- TotalRankingsBolt bolt = new TotalRankingsBolt();
-
- // when
- bolt.execute(tickTuple, collector);
-
- // then
- // verifyZeroInteractions(collector);
- verify(collector).emit(any(Values.class));
- }
-
- @Test
- public void shouldEmitNothingIfNormalTupleIsReceived() {
- // given
- Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
- BasicOutputCollector collector = mock(BasicOutputCollector.class);
- TotalRankingsBolt bolt = new TotalRankingsBolt();
-
- // when
- bolt.execute(normalTuple, collector);
-
- // then
- verifyZeroInteractions(collector);
- }
-
- @Test
- public void shouldDeclareOutputFields() {
- // given
- OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
- TotalRankingsBolt bolt = new TotalRankingsBolt();
-
- // when
- bolt.declareOutputFields(declarer);
-
- // then
- verify(declarer, times(1)).declare(any(Fields.class));
- }
-
- @Test
- public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
- // given
- TotalRankingsBolt bolt = new TotalRankingsBolt();
-
- // when
- Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
- // then
- assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
- assertThat(emitFrequencyInSeconds).isGreaterThan(0);
- }
+ private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
+ private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
+ private static final Object ANY_OBJECT = new Object();
+ private static final int ANY_TOPN = 10;
+ private static final long ANY_COUNT = 42;
+
+ private Tuple mockRankingsTuple(Object obj, long count) {
+ Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
+ Rankings rankings = mock(Rankings.class);
+ when(tuple.getValue(0)).thenReturn(rankings);
+ return tuple;
+ }
+
+ @DataProvider
+ public Object[][] illegalTopN() {
+ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
+ public void negativeOrZeroTopNShouldThrowIAE(int topN) {
+ new TotalRankingsBolt(topN);
+ }
+
+ @DataProvider
+ public Object[][] illegalEmitFrequency() {
+ return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
+ public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
+ new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+ }
+
+ @DataProvider
+ public Object[][] legalTopN() {
+ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+ }
+
+ @Test(dataProvider = "legalTopN")
+ public void positiveTopNShouldBeOk(int topN) {
+ new TotalRankingsBolt(topN);
+ }
+
+ @DataProvider
+ public Object[][] legalEmitFrequency() {
+ return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+ }
+
+ @Test(dataProvider = "legalEmitFrequency")
+ public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
+ new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+ }
+
+ @Test
+ public void shouldEmitSomethingIfTickTupleIsReceived() {
+ // given
+ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+ BasicOutputCollector collector = mock(BasicOutputCollector.class);
+ TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+ // when
+ bolt.execute(tickTuple, collector);
+
+ // then
+ // verifyZeroInteractions(collector);
+ verify(collector).emit(any(Values.class));
+ }
+
+ @Test
+ public void shouldEmitNothingIfNormalTupleIsReceived() {
+ // given
+ Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
+ BasicOutputCollector collector = mock(BasicOutputCollector.class);
+ TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+ // when
+ bolt.execute(normalTuple, collector);
+
+ // then
+ verifyZeroInteractions(collector);
+ }
+
+ @Test
+ public void shouldDeclareOutputFields() {
+ // given
+ OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+ TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+ // when
+ bolt.declareOutputFields(declarer);
+
+ // then
+ verify(declarer, times(1)).declare(any(Fields.class));
+ }
+
+ @Test
+ public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+ // given
+ TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+ // when
+ Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+ // then
+ assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/tools/MockTupleHelpers.java b/test/jvm/storm/starter/tools/MockTupleHelpers.java
index d0a9e41..fd7d921 100644
--- a/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -1,23 +1,23 @@
package storm.starter.tools;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
+import static org.mockito.Mockito.*;
+
public final class MockTupleHelpers {
- private MockTupleHelpers() {
- }
+ private MockTupleHelpers() {
+ }
- public static Tuple mockTickTuple() {
- return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
- }
+ public static Tuple mockTickTuple() {
+ return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ }
- public static Tuple mockTuple(String componentId, String streamId) {
- Tuple tuple = mock(Tuple.class);
- when(tuple.getSourceComponent()).thenReturn(componentId);
- when(tuple.getSourceStreamId()).thenReturn(streamId);
- return tuple;
- }
+ public static Tuple mockTuple(String componentId, String streamId) {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getSourceComponent()).thenReturn(componentId);
+ when(tuple.getSourceStreamId()).thenReturn(streamId);
+ return tuple;
+ }
}