You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/03/20 22:22:45 UTC

[22/50] [abbrv] Reformat Java code to use 2 instead of 4 spaces (to match Clojure style)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/SingleJoinBolt.java b/src/jvm/storm/starter/bolt/SingleJoinBolt.java
index c353b7a..e12f005 100644
--- a/src/jvm/storm/starter/bolt/SingleJoinBolt.java
+++ b/src/jvm/storm/starter/bolt/SingleJoinBolt.java
@@ -9,90 +9,89 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.TimeCacheMap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+
+import java.util.*;
 
 public class SingleJoinBolt extends BaseRichBolt {
-    OutputCollector _collector;
-    Fields _idFields;
-    Fields _outFields;
-    int _numSources;
-    TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
-    Map<String, GlobalStreamId> _fieldLocations;
-    
-    public SingleJoinBolt(Fields outFields) {
-        _outFields = outFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-        _fieldLocations = new HashMap<String, GlobalStreamId>();
-        _collector = collector;
-        int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
-        _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
-        _numSources = context.getThisSources().size();
-        Set<String> idFields = null;
-        for(GlobalStreamId source: context.getThisSources().keySet()) {
-            Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
-            Set<String> setFields = new HashSet<String>(fields.toList());
-            if(idFields==null) idFields = setFields;
-            else idFields.retainAll(setFields);
-            
-            for(String outfield: _outFields) {
-                for(String sourcefield: fields) {
-                    if(outfield.equals(sourcefield)) {
-                        _fieldLocations.put(outfield, source);
-                    }
-                }
-            }
-        }
-        _idFields = new Fields(new ArrayList<String>(idFields));
-        
-        if(_fieldLocations.size()!=_outFields.size()) {
-            throw new RuntimeException("Cannot find all outfields among sources");
+  OutputCollector _collector;
+  Fields _idFields;
+  Fields _outFields;
+  int _numSources;
+  TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
+  Map<String, GlobalStreamId> _fieldLocations;
+
+  public SingleJoinBolt(Fields outFields) {
+    _outFields = outFields;
+  }
+
+  @Override
+  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+    _fieldLocations = new HashMap<String, GlobalStreamId>();
+    _collector = collector;
+    int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
+    _numSources = context.getThisSources().size();
+    Set<String> idFields = null;
+    for (GlobalStreamId source : context.getThisSources().keySet()) {
+      Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
+      Set<String> setFields = new HashSet<String>(fields.toList());
+      if (idFields == null)
+        idFields = setFields;
+      else
+        idFields.retainAll(setFields);
+
+      for (String outfield : _outFields) {
+        for (String sourcefield : fields) {
+          if (outfield.equals(sourcefield)) {
+            _fieldLocations.put(outfield, source);
+          }
         }
+      }
     }
+    _idFields = new Fields(new ArrayList<String>(idFields));
 
-    @Override
-    public void execute(Tuple tuple) {
-        List<Object> id = tuple.select(_idFields);
-        GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
-        if(!_pending.containsKey(id)) {
-            _pending.put(id, new HashMap<GlobalStreamId, Tuple>());            
-        }
-        Map<GlobalStreamId, Tuple> parts = _pending.get(id);
-        if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");
-        parts.put(streamId, tuple);
-        if(parts.size()==_numSources) {
-            _pending.remove(id);
-            List<Object> joinResult = new ArrayList<Object>();
-            for(String outField: _outFields) {
-                GlobalStreamId loc = _fieldLocations.get(outField);
-                joinResult.add(parts.get(loc).getValueByField(outField));
-            }
-            _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-            
-            for(Tuple part: parts.values()) {
-                _collector.ack(part);
-            }
-        }
+    if (_fieldLocations.size() != _outFields.size()) {
+      throw new RuntimeException("Cannot find all outfields among sources");
+    }
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    List<Object> id = tuple.select(_idFields);
+    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
+    if (!_pending.containsKey(id)) {
+      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
     }
+    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
+    if (parts.containsKey(streamId))
+      throw new RuntimeException("Received same side of single join twice");
+    parts.put(streamId, tuple);
+    if (parts.size() == _numSources) {
+      _pending.remove(id);
+      List<Object> joinResult = new ArrayList<Object>();
+      for (String outField : _outFields) {
+        GlobalStreamId loc = _fieldLocations.get(outField);
+        joinResult.add(parts.get(loc).getValueByField(outField));
+      }
+      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
+
+      for (Tuple part : parts.values()) {
+        _collector.ack(part);
+      }
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(_outFields);
+  }
 
+  private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
     @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(_outFields);
-    }    
-    
-    private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
-        @Override
-        public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
-            for(Tuple tuple: tuples.values()) {
-                _collector.fail(tuple);
-            }
-        }        
+    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
+      for (Tuple tuple : tuples.values()) {
+        _collector.fail(tuple);
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/TotalRankingsBolt.java b/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
index 84ff69e..846ff49 100644
--- a/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
+++ b/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
@@ -1,43 +1,41 @@
 package storm.starter.bolt;
 
+import backtype.storm.tuple.Tuple;
 import org.apache.log4j.Logger;
-
 import storm.starter.tools.Rankings;
-import backtype.storm.tuple.Tuple;
 
 /**
  * This bolt merges incoming {@link Rankings}.
- * 
+ * <p/>
  * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,
  * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.
- * 
  */
 public final class TotalRankingsBolt extends AbstractRankerBolt {
 
-    private static final long serialVersionUID = -8447525895532302198L;
-    private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
+  private static final long serialVersionUID = -8447525895532302198L;
+  private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
 
-    public TotalRankingsBolt() {
-        super();
-    }
+  public TotalRankingsBolt() {
+    super();
+  }
 
-    public TotalRankingsBolt(int topN) {
-        super(topN);
-    }
+  public TotalRankingsBolt(int topN) {
+    super(topN);
+  }
 
-    public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
-        super(topN, emitFrequencyInSeconds);
-    }
+  public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
+    super(topN, emitFrequencyInSeconds);
+  }
 
-    @Override
-    void updateRankingsWithTuple(Tuple tuple) {
-        Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
-        super.getRankings().updateWith(rankingsToBeMerged);
-    }
+  @Override
+  void updateRankingsWithTuple(Tuple tuple) {
+    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
+    super.getRankings().updateWith(rankingsToBeMerged);
+  }
 
-    @Override
-    Logger getLogger() {
-        return LOG;
-    }
+  @Override
+  Logger getLogger() {
+    return LOG;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/spout/RandomSentenceSpout.java b/src/jvm/storm/starter/spout/RandomSentenceSpout.java
index af6c151..e808f83 100644
--- a/src/jvm/storm/starter/spout/RandomSentenceSpout.java
+++ b/src/jvm/storm/starter/spout/RandomSentenceSpout.java
@@ -7,44 +7,41 @@ import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
+
 import java.util.Map;
 import java.util.Random;
 
 public class RandomSentenceSpout extends BaseRichSpout {
-    SpoutOutputCollector _collector;
-    Random _rand;    
-    
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        _collector = collector;
-        _rand = new Random();
-    }
-
-    @Override
-    public void nextTuple() {
-        Utils.sleep(100);
-        String[] sentences = new String[] {
-            "the cow jumped over the moon",
-            "an apple a day keeps the doctor away",
-            "four score and seven years ago",
-            "snow white and the seven dwarfs",
-            "i am at two with nature"};
-        String sentence = sentences[_rand.nextInt(sentences.length)];
-        _collector.emit(new Values(sentence));
-    }        
-
-    @Override
-    public void ack(Object id) {
-    }
-
-    @Override
-    public void fail(Object id) {
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("word"));
-    }
-    
+  SpoutOutputCollector _collector;
+  Random _rand;
+
+
+  @Override
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    _collector = collector;
+    _rand = new Random();
+  }
+
+  @Override
+  public void nextTuple() {
+    Utils.sleep(100);
+    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+    String sentence = sentences[_rand.nextInt(sentences.length)];
+    _collector.emit(new Values(sentence));
+  }
+
+  @Override
+  public void ack(Object id) {
+  }
+
+  @Override
+  public void fail(Object id) {
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("word"));
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java b/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
index 4b85cd2..92998a6 100644
--- a/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
+++ b/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
@@ -1,55 +1,53 @@
 package storm.starter.tools;
 
-import org.apache.commons.collections.buffer.CircularFifoBuffer;
-
 import backtype.storm.utils.Time;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
 
 /**
  * This class tracks the time-since-last-modify of a "thing" in a rolling fashion.
- * 
+ * <p/>
  * For example, create a 5-slot tracker to track the five most recent time-since-last-modify.
- * 
+ * <p/>
  * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just
  * been modified.
- * 
  */
 public class NthLastModifiedTimeTracker {
 
-    private static final int MILLIS_IN_SEC = 1000;
+  private static final int MILLIS_IN_SEC = 1000;
 
-    private final CircularFifoBuffer lastModifiedTimesMillis;
+  private final CircularFifoBuffer lastModifiedTimesMillis;
 
-    public NthLastModifiedTimeTracker(int numTimesToTrack) {
-        if (numTimesToTrack < 1) {
-            throw new IllegalArgumentException("numTimesToTrack must be greater than zero (you requested "
-                + numTimesToTrack + ")");
-        }
-        lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
-        initLastModifiedTimesMillis();
+  public NthLastModifiedTimeTracker(int numTimesToTrack) {
+    if (numTimesToTrack < 1) {
+      throw new IllegalArgumentException(
+          "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
     }
-
-    private void initLastModifiedTimesMillis() {
-        long nowCached = now();
-        for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
-            lastModifiedTimesMillis.add(Long.valueOf(nowCached));
-        }
+    lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
+    initLastModifiedTimesMillis();
+  }
+
+  private void initLastModifiedTimesMillis() {
+    long nowCached = now();
+    for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
+      lastModifiedTimesMillis.add(Long.valueOf(nowCached));
     }
+  }
 
-    private long now() {
-        return Time.currentTimeMillis();
-    }
+  private long now() {
+    return Time.currentTimeMillis();
+  }
 
-    public int secondsSinceOldestModification() {
-        long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
-        return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
-    }
+  public int secondsSinceOldestModification() {
+    long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
+    return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
+  }
 
-    public void markAsModified() {
-        updateLastModifiedTime();
-    }
+  public void markAsModified() {
+    updateLastModifiedTime();
+  }
 
-    private void updateLastModifiedTime() {
-        lastModifiedTimesMillis.add(now());
-    }
+  private void updateLastModifiedTime() {
+    lastModifiedTimesMillis.add(now());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankable.java b/src/jvm/storm/starter/tools/Rankable.java
index 03f742f..36ba086 100644
--- a/src/jvm/storm/starter/tools/Rankable.java
+++ b/src/jvm/storm/starter/tools/Rankable.java
@@ -2,8 +2,8 @@ package storm.starter.tools;
 
 public interface Rankable extends Comparable<Rankable> {
 
-    Object getObject();
+  Object getObject();
 
-    long getCount();
+  long getCount();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/RankableObjectWithFields.java b/src/jvm/storm/starter/tools/RankableObjectWithFields.java
index 38db6e9..1fdad37 100644
--- a/src/jvm/storm/starter/tools/RankableObjectWithFields.java
+++ b/src/jvm/storm/starter/tools/RankableObjectWithFields.java
@@ -1,119 +1,118 @@
 package storm.starter.tools;
 
-import java.io.Serializable;
-import java.util.List;
-
 import backtype.storm.tuple.Tuple;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+import java.io.Serializable;
+import java.util.List;
+
 /**
  * This class wraps an objects and its associated count, including any additional data fields.
- * 
+ * <p/>
  * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology.
- * 
  */
 public class RankableObjectWithFields implements Rankable, Serializable {
 
-    private static final long serialVersionUID = -9102878650001058090L;
-    private static final String toStringSeparator = "|";
-
-    private final Object obj;
-    private final long count;
-    private final ImmutableList<Object> fields;
+  private static final long serialVersionUID = -9102878650001058090L;
+  private static final String toStringSeparator = "|";
 
-    public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
-        if (obj == null) {
-            throw new IllegalArgumentException("The object must not be null");
-        }
-        if (count < 0) {
-            throw new IllegalArgumentException("The count must be >= 0");
-        }
-        this.obj = obj;
-        this.count = count;
-        fields = ImmutableList.copyOf(otherFields);
+  private final Object obj;
+  private final long count;
+  private final ImmutableList<Object> fields;
 
+  public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
+    if (obj == null) {
+      throw new IllegalArgumentException("The object must not be null");
     }
-
-    /**
-     * Construct a new instance based on the provided {@link Tuple}.
-     * 
-     * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of
-     * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be
-     * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.
-     * 
-     * @param tuple
-     * @return
-     */
-    public static RankableObjectWithFields from(Tuple tuple) {
-        List<Object> otherFields = Lists.newArrayList(tuple.getValues());
-        Object obj = otherFields.remove(0);
-        Long count = (Long) otherFields.remove(0);
-        return new RankableObjectWithFields(obj, count, otherFields.toArray());
-    }
-
-    public Object getObject() {
-        return obj;
+    if (count < 0) {
+      throw new IllegalArgumentException("The count must be >= 0");
     }
-
-    public long getCount() {
-        return count;
+    this.obj = obj;
+    this.count = count;
+    fields = ImmutableList.copyOf(otherFields);
+
+  }
+
+  /**
+   * Construct a new instance based on the provided {@link Tuple}.
+   * <p/>
+   * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of
+   * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be
+   * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.
+   *
+   * @param tuple
+   *
+   * @return
+   */
+  public static RankableObjectWithFields from(Tuple tuple) {
+    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
+    Object obj = otherFields.remove(0);
+    Long count = (Long) otherFields.remove(0);
+    return new RankableObjectWithFields(obj, count, otherFields.toArray());
+  }
+
+  public Object getObject() {
+    return obj;
+  }
+
+  public long getCount() {
+    return count;
+  }
+
+  /**
+   * @return an immutable list of any additional data fields of the object (may be empty but will never be null)
+   */
+  public List<Object> getFields() {
+    return fields;
+  }
+
+  @Override
+  public int compareTo(Rankable other) {
+    long delta = this.getCount() - other.getCount();
+    if (delta > 0) {
+      return 1;
     }
-
-    /**
-     * @return an immutable list of any additional data fields of the object (may be empty but will never be null)
-     */
-    public List<Object> getFields() {
-        return fields;
+    else if (delta < 0) {
+      return -1;
     }
-
-    @Override
-    public int compareTo(Rankable other) {
-        long delta = this.getCount() - other.getCount();
-        if (delta > 0) {
-            return 1;
-        }
-        else if (delta < 0) {
-            return -1;
-        }
-        else {
-            return 0;
-        }
+    else {
+      return 0;
     }
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof RankableObjectWithFields)) {
-            return false;
-        }
-        RankableObjectWithFields other = (RankableObjectWithFields) o;
-        return obj.equals(other.obj) && count == other.count;
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
     }
-
-    @Override
-    public int hashCode() {
-        int result = 17;
-        int countHash = (int) (count ^ (count >>> 32));
-        result = 31 * result + countHash;
-        result = 31 * result + obj.hashCode();
-        return result;
+    if (!(o instanceof RankableObjectWithFields)) {
+      return false;
     }
-
-    public String toString() {
-        StringBuffer buf = new StringBuffer();
-        buf.append("[");
-        buf.append(obj);
-        buf.append(toStringSeparator);
-        buf.append(count);
-        for (Object field : fields) {
-            buf.append(toStringSeparator);
-            buf.append(field);
-        }
-        buf.append("]");
-        return buf.toString();
+    RankableObjectWithFields other = (RankableObjectWithFields) o;
+    return obj.equals(other.obj) && count == other.count;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    int countHash = (int) (count ^ (count >>> 32));
+    result = 31 * result + countHash;
+    result = 31 * result + obj.hashCode();
+    return result;
+  }
+
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    buf.append("[");
+    buf.append(obj);
+    buf.append(toStringSeparator);
+    buf.append(count);
+    for (Object field : fields) {
+      buf.append(toStringSeparator);
+      buf.append(field);
     }
+    buf.append("]");
+    return buf.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankings.java b/src/jvm/storm/starter/tools/Rankings.java
index d6fd34c..a3f421a 100644
--- a/src/jvm/storm/starter/tools/Rankings.java
+++ b/src/jvm/storm/starter/tools/Rankings.java
@@ -1,95 +1,95 @@
 package storm.starter.tools;
 
+import com.google.common.collect.Lists;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-
 public class Rankings implements Serializable {
 
-    private static final long serialVersionUID = -1549827195410578903L;
-    private static final int DEFAULT_COUNT = 10;
-
-    private final int maxSize;
-    private final List<Rankable> rankedItems = Lists.newArrayList();
-
-    public Rankings() {
-        this(DEFAULT_COUNT);
-    }
+  private static final long serialVersionUID = -1549827195410578903L;
+  private static final int DEFAULT_COUNT = 10;
 
-    public Rankings(int topN) {
-        if (topN < 1) {
-            throw new IllegalArgumentException("topN must be >= 1");
-        }
-        maxSize = topN;
-    }
+  private final int maxSize;
+  private final List<Rankable> rankedItems = Lists.newArrayList();
 
-    /**
-     * @return the maximum possible number (size) of ranked objects this instance can hold
-     */
-    public int maxSize() {
-        return maxSize;
-    }
+  public Rankings() {
+    this(DEFAULT_COUNT);
+  }
 
-    /**
-     * @return the number (size) of ranked objects this instance is currently holding
-     */
-    public int size() {
-        return rankedItems.size();
+  public Rankings(int topN) {
+    if (topN < 1) {
+      throw new IllegalArgumentException("topN must be >= 1");
     }
-
-    public List<Rankable> getRankings() {
-        return Lists.newArrayList(rankedItems);
+    maxSize = topN;
+  }
+
+  /**
+   * @return the maximum possible number (size) of ranked objects this instance can hold
+   */
+  public int maxSize() {
+    return maxSize;
+  }
+
+  /**
+   * @return the number (size) of ranked objects this instance is currently holding
+   */
+  public int size() {
+    return rankedItems.size();
+  }
+
+  public List<Rankable> getRankings() {
+    return Lists.newArrayList(rankedItems);
+  }
+
+  public void updateWith(Rankings other) {
+    for (Rankable r : other.getRankings()) {
+      updateWith(r);
     }
+  }
 
-    public void updateWith(Rankings other) {
-        for (Rankable r : other.getRankings()) {
-            updateWith(r);
-        }
+  public void updateWith(Rankable r) {
+    synchronized(rankedItems) {
+      addOrReplace(r);
+      rerank();
+      shrinkRankingsIfNeeded();
     }
+  }
 
-    public void updateWith(Rankable r) {
-        synchronized(rankedItems) {
-            addOrReplace(r);
-            rerank();
-            shrinkRankingsIfNeeded();
-        }
+  private void addOrReplace(Rankable r) {
+    Integer rank = findRankOf(r);
+    if (rank != null) {
+      rankedItems.set(rank, r);
     }
-
-    private void addOrReplace(Rankable r) {
-        Integer rank = findRankOf(r);
-        if (rank != null) {
-            rankedItems.set(rank, r);
-        }
-        else {
-            rankedItems.add(r);
-        }
+    else {
+      rankedItems.add(r);
     }
-
-    private Integer findRankOf(Rankable r) {
-        Object tag = r.getObject();
-        for (int rank = 0; rank < rankedItems.size(); rank++) {
-            Object cur = rankedItems.get(rank).getObject();
-            if (cur.equals(tag)) {
-                return rank;
-            }
-        }
-        return null;
+  }
+
+  private Integer findRankOf(Rankable r) {
+    Object tag = r.getObject();
+    for (int rank = 0; rank < rankedItems.size(); rank++) {
+      Object cur = rankedItems.get(rank).getObject();
+      if (cur.equals(tag)) {
+        return rank;
+      }
     }
+    return null;
+  }
 
-    private void rerank() {
-        Collections.sort(rankedItems);
-        Collections.reverse(rankedItems);
-    }
+  private void rerank() {
+    Collections.sort(rankedItems);
+    Collections.reverse(rankedItems);
+  }
 
-    private void shrinkRankingsIfNeeded() {
-        if (rankedItems.size() > maxSize) {
-            rankedItems.remove(maxSize);
-        }
+  private void shrinkRankingsIfNeeded() {
+    if (rankedItems.size() > maxSize) {
+      rankedItems.remove(maxSize);
     }
+  }
 
-    public String toString() {
-        return rankedItems.toString();
-    }
+  public String toString() {
+    return rankedItems.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/SlidingWindowCounter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/SlidingWindowCounter.java b/src/jvm/storm/starter/tools/SlidingWindowCounter.java
index b20af00..dff6a59 100644
--- a/src/jvm/storm/starter/tools/SlidingWindowCounter.java
+++ b/src/jvm/storm/starter/tools/SlidingWindowCounter.java
@@ -5,99 +5,98 @@ import java.util.Map;
 
 /**
  * This class counts objects in a sliding window fashion.
- * 
+ * <p/>
  * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment
  * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access
  * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the
  * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads
  * will go to. Also, by itself this class will not advance the head slot.
- * 
+ * <p/>
  * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code>
  * iterations, this sliding window counter will always return object counts that are equal or greater than in the
  * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually,
  * this is the desired behavior.
- * 
+ * <p/>
  * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each:
- * 
+ * <p/>
  * <pre>
  * {@code
  * Sliding window counts of an object X over time
- * 
+ *
  * Minute (timeline):
  * 1    2   3   4   5   6   7   8
- * 
+ *
  * Observed counts per minute:
  * 1    1   1   1   0   0   0   0
- * 
+ *
  * Counts returned by counter:
  * 1    2   3   4   4   3   2   1
  * }
  * </pre>
- * 
+ * <p/>
  * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the
  * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load
  * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your
  * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously
  * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing
  * counts.
- * 
+ * <p/>
  * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes,
- * "how often did you count the object during the past five minutes?", then it should reply
- * "I have counted it 2 times in the past five minutes", implying that it can only account for the last two of those
- * five minutes because the counter was not running before that time.
- * 
- * @param <T>
- *            The type of those objects we want to count.
+ * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times
+ * in the past five minutes", implying that it can only account for the last two of those five minutes because the
+ * counter was not running before that time.
+ *
+ * @param <T> The type of those objects we want to count.
  */
 public final class SlidingWindowCounter<T> implements Serializable {
 
-    private static final long serialVersionUID = -2645063988768785810L;
+  private static final long serialVersionUID = -2645063988768785810L;
 
-    private SlotBasedCounter<T> objCounter;
-    private int headSlot;
-    private int tailSlot;
-    private int windowLengthInSlots;
+  private SlotBasedCounter<T> objCounter;
+  private int headSlot;
+  private int tailSlot;
+  private int windowLengthInSlots;
 
-    public SlidingWindowCounter(int windowLengthInSlots) {
-        if (windowLengthInSlots < 2) {
-            throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
-                + windowLengthInSlots + ")");
-        }
-        this.windowLengthInSlots = windowLengthInSlots;
-        this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
-
-        this.headSlot = 0;
-        this.tailSlot = slotAfter(headSlot);
+  public SlidingWindowCounter(int windowLengthInSlots) {
+    if (windowLengthInSlots < 2) {
+      throw new IllegalArgumentException(
+          "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")");
     }
+    this.windowLengthInSlots = windowLengthInSlots;
+    this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
 
-    public void incrementCount(T obj) {
-        objCounter.incrementCount(obj, headSlot);
-    }
+    this.headSlot = 0;
+    this.tailSlot = slotAfter(headSlot);
+  }
 
-    /**
-     * Return the current (total) counts of all tracked objects, then advance the window.
-     * 
-     * Whenever this method is called, we consider the counts of the current sliding window to be available to and
-     * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
-     * objects within the next "chunk" of the sliding window.
-     * 
-     * @return
-     */
-    public Map<T, Long> getCountsThenAdvanceWindow() {
-        Map<T, Long> counts = objCounter.getCounts();
-        objCounter.wipeZeros();
-        objCounter.wipeSlot(tailSlot);
-        advanceHead();
-        return counts;
-    }
+  public void incrementCount(T obj) {
+    objCounter.incrementCount(obj, headSlot);
+  }
 
-    private void advanceHead() {
-        headSlot = tailSlot;
-        tailSlot = slotAfter(tailSlot);
-    }
+  /**
+   * Return the current (total) counts of all tracked objects, then advance the window.
+   * <p/>
+   * Whenever this method is called, we consider the counts of the current sliding window to be available to and
+   * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
+   * objects within the next "chunk" of the sliding window.
+   *
+   * @return
+   */
+  public Map<T, Long> getCountsThenAdvanceWindow() {
+    Map<T, Long> counts = objCounter.getCounts();
+    objCounter.wipeZeros();
+    objCounter.wipeSlot(tailSlot);
+    advanceHead();
+    return counts;
+  }
 
-    private int slotAfter(int slot) {
-        return (slot + 1) % windowLengthInSlots;
-    }
+  private void advanceHead() {
+    headSlot = tailSlot;
+    tailSlot = slotAfter(tailSlot);
+  }
+
+  private int slotAfter(int slot) {
+    return (slot + 1) % windowLengthInSlots;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/tools/SlotBasedCounter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/SlotBasedCounter.java b/src/jvm/storm/starter/tools/SlotBasedCounter.java
index 5624a54..f45ef48 100644
--- a/src/jvm/storm/starter/tools/SlotBasedCounter.java
+++ b/src/jvm/storm/starter/tools/SlotBasedCounter.java
@@ -8,96 +8,94 @@ import java.util.Set;
 
 /**
  * This class provides per-slot counts of the occurrences of objects.
- * 
+ * <p/>
  * It can be used, for instance, as a building block for implementing sliding window counting of objects.
- * 
- * @param <T>
- *            The type of those objects we want to count.
+ *
+ * @param <T> The type of those objects we want to count.
  */
 public final class SlotBasedCounter<T> implements Serializable {
 
-    private static final long serialVersionUID = 4858185737378394432L;
+  private static final long serialVersionUID = 4858185737378394432L;
 
-    private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
-    private final int numSlots;
+  private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
+  private final int numSlots;
 
-    public SlotBasedCounter(int numSlots) {
-        if (numSlots <= 0) {
-            throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
-                + ")");
-        }
-        this.numSlots = numSlots;
+  public SlotBasedCounter(int numSlots) {
+    if (numSlots <= 0) {
+      throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")");
     }
+    this.numSlots = numSlots;
+  }
 
-    public void incrementCount(T obj, int slot) {
-        long[] counts = objToCounts.get(obj);
-        if (counts == null) {
-            counts = new long[this.numSlots];
-            objToCounts.put(obj, counts);
-        }
-        counts[slot]++;
+  public void incrementCount(T obj, int slot) {
+    long[] counts = objToCounts.get(obj);
+    if (counts == null) {
+      counts = new long[this.numSlots];
+      objToCounts.put(obj, counts);
     }
+    counts[slot]++;
+  }
 
-    public long getCount(T obj, int slot) {
-        long[] counts = objToCounts.get(obj);
-        if (counts == null) {
-            return 0;
-        }
-        else {
-            return counts[slot];
-        }
+  public long getCount(T obj, int slot) {
+    long[] counts = objToCounts.get(obj);
+    if (counts == null) {
+      return 0;
     }
-
-    public Map<T, Long> getCounts() {
-        Map<T, Long> result = new HashMap<T, Long>();
-        for (T obj : objToCounts.keySet()) {
-            result.put(obj, computeTotalCount(obj));
-        }
-        return result;
+    else {
+      return counts[slot];
     }
+  }
 
-    private long computeTotalCount(T obj) {
-        long[] curr = objToCounts.get(obj);
-        long total = 0;
-        for (long l : curr) {
-            total += l;
-        }
-        return total;
+  public Map<T, Long> getCounts() {
+    Map<T, Long> result = new HashMap<T, Long>();
+    for (T obj : objToCounts.keySet()) {
+      result.put(obj, computeTotalCount(obj));
     }
+    return result;
+  }
 
-    /**
-     * Reset the slot count of any tracked objects to zero for the given slot.
-     * 
-     * @param slot
-     */
-    public void wipeSlot(int slot) {
-        for (T obj : objToCounts.keySet()) {
-            resetSlotCountToZero(obj, slot);
-        }
+  private long computeTotalCount(T obj) {
+    long[] curr = objToCounts.get(obj);
+    long total = 0;
+    for (long l : curr) {
+      total += l;
     }
+    return total;
+  }
 
-    private void resetSlotCountToZero(T obj, int slot) {
-        long[] counts = objToCounts.get(obj);
-        counts[slot] = 0;
+  /**
+   * Reset the slot count of any tracked objects to zero for the given slot.
+   *
+   * @param slot
+   */
+  public void wipeSlot(int slot) {
+    for (T obj : objToCounts.keySet()) {
+      resetSlotCountToZero(obj, slot);
     }
+  }
 
-    private boolean shouldBeRemovedFromCounter(T obj) {
-        return computeTotalCount(obj) == 0;
-    }
+  private void resetSlotCountToZero(T obj, int slot) {
+    long[] counts = objToCounts.get(obj);
+    counts[slot] = 0;
+  }
 
-    /**
-     * Remove any object from the counter whose total count is zero (to free up memory).
-     */
-    public void wipeZeros() {
-        Set<T> objToBeRemoved = new HashSet<T>();
-        for (T obj : objToCounts.keySet()) {
-            if (shouldBeRemovedFromCounter(obj)) {
-                objToBeRemoved.add(obj);
-            }
-        }
-        for (T obj : objToBeRemoved) {
-            objToCounts.remove(obj);
-        }
+  private boolean shouldBeRemovedFromCounter(T obj) {
+    return computeTotalCount(obj) == 0;
+  }
+
+  /**
+   * Remove any object from the counter whose total count is zero (to free up memory).
+   */
+  public void wipeZeros() {
+    Set<T> objToBeRemoved = new HashSet<T>();
+    for (T obj : objToCounts.keySet()) {
+      if (shouldBeRemovedFromCounter(obj)) {
+        objToBeRemoved.add(obj);
+      }
+    }
+    for (T obj : objToBeRemoved) {
+      objToCounts.remove(obj);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/trident/TridentReach.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/trident/TridentReach.java b/src/jvm/storm/starter/trident/TridentReach.java
index ec10148..a940b40 100644
--- a/src/jvm/storm/starter/trident/TridentReach.java
+++ b/src/jvm/storm/starter/trident/TridentReach.java
@@ -3,15 +3,10 @@ package storm.starter.trident;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.task.IMetricsContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
 import storm.trident.operation.BaseFunction;
@@ -25,126 +20,120 @@ import storm.trident.state.StateFactory;
 import storm.trident.state.map.ReadOnlyMapState;
 import storm.trident.tuple.TridentTuple;
 
+import java.util.*;
+
 public class TridentReach {
-    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
-       put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
-       put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
-       put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
-    }};
-
-    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
-        put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
-        put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
-        put("tim", Arrays.asList("alex"));
-        put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
-        put("adam", Arrays.asList("david", "carissa"));
-        put("mike", Arrays.asList("john", "bob"));
-        put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
-    }};
-
-    public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
-        public static class Factory implements StateFactory {
-            Map _map;
-
-            public Factory(Map map) {
-                _map = map;
-            }
-
-            @Override
-            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-                return new StaticSingleKeyMapState(_map);
-            }
+  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+  }};
+
+  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+    put("tim", Arrays.asList("alex"));
+    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+    put("adam", Arrays.asList("david", "carissa"));
+    put("mike", Arrays.asList("john", "bob"));
+    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+  }};
+
+  public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {
+    public static class Factory implements StateFactory {
+      Map _map;
+
+      public Factory(Map map) {
+        _map = map;
+      }
+
+      @Override
+      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        return new StaticSingleKeyMapState(_map);
+      }
 
-        }
+    }
 
-        Map _map;
+    Map _map;
 
-        public StaticSingleKeyMapState(Map map) {
-            _map = map;
-        }
+    public StaticSingleKeyMapState(Map map) {
+      _map = map;
+    }
 
 
-        @Override
-        public List<Object> multiGet(List<List<Object>> keys) {
-            List<Object> ret = new ArrayList();
-            for(List<Object> key: keys) {
-                Object singleKey = key.get(0);
-                ret.add(_map.get(singleKey));
-            }
-            return ret;
-        }
+    @Override
+    public List<Object> multiGet(List<List<Object>> keys) {
+      List<Object> ret = new ArrayList();
+      for (List<Object> key : keys) {
+        Object singleKey = key.get(0);
+        ret.add(_map.get(singleKey));
+      }
+      return ret;
+    }
 
+  }
+
+  public static class One implements CombinerAggregator<Integer> {
+    @Override
+    public Integer init(TridentTuple tuple) {
+      return 1;
     }
 
-    public static class One implements CombinerAggregator<Integer> {
-        @Override
-        public Integer init(TridentTuple tuple) {
-            return 1;
-        }
+    @Override
+    public Integer combine(Integer val1, Integer val2) {
+      return 1;
+    }
 
-        @Override
-        public Integer combine(Integer val1, Integer val2) {
-            return 1;
-        }
+    @Override
+    public Integer zero() {
+      return 1;
+    }
+  }
 
-        @Override
-        public Integer zero() {
-            return 1;
+  public static class ExpandList extends BaseFunction {
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      List l = (List) tuple.getValue(0);
+      if (l != null) {
+        for (Object o : l) {
+          collector.emit(new Values(o));
         }
+      }
     }
 
-    public static class ExpandList extends BaseFunction {
+  }
 
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            List l = (List) tuple.getValue(0);
-            if(l!=null) {
-                for(Object o: l) {
-                    collector.emit(new Values(o));
-                }
-            }
-        }
+  public static StormTopology buildTopology(LocalDRPC drpc) {
+    TridentTopology topology = new TridentTopology();
+    TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));
+    TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
 
-    }
 
-    public static StormTopology buildTopology(LocalDRPC drpc) {
-        TridentTopology topology = new TridentTopology();
-        TridentState urlToTweeters =
-                topology.newStaticState(
-                    new StaticSingleKeyMapState.Factory(TWEETERS_DB));
-        TridentState tweetersToFollowers =
-                topology.newStaticState(
-                    new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));
-
-
-        topology.newDRPCStream("reach", drpc)
-                .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
-                .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
-                .shuffle()
-                .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
-                .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
-                .groupBy(new Fields("follower"))
-                .aggregate(new One(), new Fields("one"))
-                .aggregate(new Fields("one"), new Sum(), new Fields("reach"));
-        return topology.build();
-    }
+    topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields(
+        "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(
+        tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),
+        new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields(
+        "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));
+    return topology.build();
+  }
 
-    public static void main(String[] args) throws Exception {
-        LocalDRPC drpc = new LocalDRPC();
+  public static void main(String[] args) throws Exception {
+    LocalDRPC drpc = new LocalDRPC();
 
-        Config conf = new Config();
-        LocalCluster cluster = new LocalCluster();
+    Config conf = new Config();
+    LocalCluster cluster = new LocalCluster();
 
-        cluster.submitTopology("reach", conf, buildTopology(drpc));
+    cluster.submitTopology("reach", conf, buildTopology(drpc));
 
-        Thread.sleep(2000);
+    Thread.sleep(2000);
 
-        System.out.println("REACH: " + drpc.execute("reach", "aaa"));
-        System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
-        System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
+    System.out.println("REACH: " + drpc.execute("reach", "aaa"));
+    System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
+    System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
 
 
-        cluster.shutdown();
-        drpc.shutdown();
-    }
+    cluster.shutdown();
+    drpc.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/trident/TridentWordCount.java b/src/jvm/storm/starter/trident/TridentWordCount.java
index 268d41c..c56280c 100644
--- a/src/jvm/storm/starter/trident/TridentWordCount.java
+++ b/src/jvm/storm/starter/trident/TridentWordCount.java
@@ -1,6 +1,5 @@
 package storm.starter.trident;
 
-import storm.trident.testing.FixedBatchSpout;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
@@ -16,65 +15,54 @@ import storm.trident.operation.builtin.Count;
 import storm.trident.operation.builtin.FilterNull;
 import storm.trident.operation.builtin.MapGet;
 import storm.trident.operation.builtin.Sum;
-import storm.trident.planner.processor.StateQueryProcessor;
+import storm.trident.testing.FixedBatchSpout;
 import storm.trident.testing.MemoryMapState;
 import storm.trident.tuple.TridentTuple;
 
 
-public class TridentWordCount {    
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for(String word: sentence.split(" ")) {
-                collector.emit(new Values(word));                
-            }
-        }
+public class TridentWordCount {
+  public static class Split extends BaseFunction {
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      String sentence = tuple.getString(0);
+      for (String word : sentence.split(" ")) {
+        collector.emit(new Values(word));
+      }
     }
-    
-    public static StormTopology buildTopology(LocalDRPC drpc) {
-        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
-                new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"),
-                new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"),
-                new Values("to be or not to be the person"));
-        spout.setCycle(true);
-        
-        TridentTopology topology = new TridentTopology();        
-        TridentState wordCounts =
-              topology.newStream("spout1", spout)
-                .parallelismHint(16)
-                .each(new Fields("sentence"), new Split(), new Fields("word"))
-                .groupBy(new Fields("word"))
-                .persistentAggregate(new MemoryMapState.Factory(),
-                                     new Count(), new Fields("count"))         
-                .parallelismHint(16);
-                
-        topology.newDRPCStream("words", drpc)
-                .each(new Fields("args"), new Split(), new Fields("word"))
-                .groupBy(new Fields("word"))
-                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
-                .each(new Fields("count"), new FilterNull())
-                .aggregate(new Fields("count"), new Sum(), new Fields("sum"))
-                ;
-        return topology.build();
+  }
+
+  public static StormTopology buildTopology(LocalDRPC drpc) {
+    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+        new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+        new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+    spout.setCycle(true);
+
+    TridentTopology topology = new TridentTopology();
+    TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
+        new Count(), new Fields("count")).parallelismHint(16);
+
+    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
+        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
+        new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+    return topology.build();
+  }
+
+  public static void main(String[] args) throws Exception {
+    Config conf = new Config();
+    conf.setMaxSpoutPending(20);
+    if (args.length == 0) {
+      LocalDRPC drpc = new LocalDRPC();
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
+      for (int i = 0; i < 100; i++) {
+        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+        Thread.sleep(1000);
+      }
     }
-    
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        if(args.length==0) {
-            LocalDRPC drpc = new LocalDRPC();
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
-            for(int i=0; i<100; i++) {
-                System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
-                Thread.sleep(1000);
-            }
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[0], conf, buildTopology(null));        
-        }
+    else {
+      conf.setNumWorkers(3);
+      StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/util/StormRunner.java b/src/jvm/storm/starter/util/StormRunner.java
index 09b4062..30c4f50 100644
--- a/src/jvm/storm/starter/util/StormRunner.java
+++ b/src/jvm/storm/starter/util/StormRunner.java
@@ -6,17 +6,17 @@ import backtype.storm.generated.StormTopology;
 
 public final class StormRunner {
 
-    private static final int MILLIS_IN_SEC = 1000;
+  private static final int MILLIS_IN_SEC = 1000;
 
-    private StormRunner() {
-    }
+  private StormRunner() {
+  }
 
-    public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
-            throws InterruptedException {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(topologyName, conf, topology);
-        Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
-        cluster.killTopology(topologyName);
-        cluster.shutdown();
-    }
+  public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
+      throws InterruptedException {
+    LocalCluster cluster = new LocalCluster();
+    cluster.submitTopology(topologyName, conf, topology);
+    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
+    cluster.killTopology(topologyName);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/util/TupleHelpers.java b/src/jvm/storm/starter/util/TupleHelpers.java
index 58693d7..a46480e 100644
--- a/src/jvm/storm/starter/util/TupleHelpers.java
+++ b/src/jvm/storm/starter/util/TupleHelpers.java
@@ -5,12 +5,12 @@ import backtype.storm.tuple.Tuple;
 
 public final class TupleHelpers {
 
-    private TupleHelpers() {
-    }
+  private TupleHelpers() {
+  }
 
-    public static boolean isTickTuple(Tuple tuple) {
-        return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
-            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
-    }
+  public static boolean isTickTuple(Tuple tuple) {
+    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
+        Constants.SYSTEM_TICK_STREAM_ID);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java b/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
index c51d081..cbbee3c 100644
--- a/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
+++ b/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
@@ -1,135 +1,129 @@
 package storm.starter.bolt;
 
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import storm.starter.tools.MockTupleHelpers;
 import backtype.storm.Config;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
-
 import com.google.common.collect.Lists;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import storm.starter.tools.MockTupleHelpers;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
 public class IntermediateRankingsBoltTest {
 
-    private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
-    private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-    private static final Object ANY_OBJECT = new Object();
-    private static final int ANY_TOPN = 10;
-    private static final long ANY_COUNT = 42;
-
-    private Tuple mockRankableTuple(Object obj, long count) {
-        Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
-        when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
-        return tuple;
-    }
-
-    @DataProvider
-    public Object[][] illegalTopN() {
-        return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
-    public void negativeOrZeroTopNShouldThrowIAE(int topN) {
-        new IntermediateRankingsBolt(topN);
-    }
-
-    @DataProvider
-    public Object[][] illegalEmitFrequency() {
-        return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
-    public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
-        new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-    }
-
-    @DataProvider
-    public Object[][] legalTopN() {
-        return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
-    }
-
-    @Test(dataProvider = "legalTopN")
-    public void positiveTopNShouldBeOk(int topN) {
-        new IntermediateRankingsBolt(topN);
-    }
-
-    @DataProvider
-    public Object[][] legalEmitFrequency() {
-        return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
-    }
-
-    @Test(dataProvider = "legalEmitFrequency")
-    public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
-        new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-    }
-
-    @Test
-    public void shouldEmitSomethingIfTickTupleIsReceived() {
-        // given
-        Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-        BasicOutputCollector collector = mock(BasicOutputCollector.class);
-        IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-        // when
-        bolt.execute(tickTuple, collector);
-
-        // then
-        // verifyZeroInteractions(collector);
-        verify(collector).emit(any(Values.class));
-    }
-
-    @Test
-    public void shouldEmitNothingIfNormalTupleIsReceived() {
-        // given
-        Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
-        BasicOutputCollector collector = mock(BasicOutputCollector.class);
-        IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-        // when
-        bolt.execute(normalTuple, collector);
-
-        // then
-        verifyZeroInteractions(collector);
-    }
-
-    @Test
-    public void shouldDeclareOutputFields() {
-        // given
-        OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
-        IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-        // when
-        bolt.declareOutputFields(declarer);
-
-        // then
-        verify(declarer, times(1)).declare(any(Fields.class));
-    }
-
-    @Test
-    public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
-        // given
-        IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-        // when
-        Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
-        // then
-        assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-        Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-        assertThat(emitFrequencyInSeconds).isGreaterThan(0);
-    }
+  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
+  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
+  private static final Object ANY_OBJECT = new Object();
+  private static final int ANY_TOPN = 10;
+  private static final long ANY_COUNT = 42;
+
+  private Tuple mockRankableTuple(Object obj, long count) {
+    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
+    when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
+    return tuple;
+  }
+
+  @DataProvider
+  public Object[][] illegalTopN() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
+  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
+    new IntermediateRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] illegalEmitFrequency() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
+  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
+    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @DataProvider
+  public Object[][] legalTopN() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalTopN")
+  public void positiveTopNShouldBeOk(int topN) {
+    new IntermediateRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] legalEmitFrequency() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalEmitFrequency")
+  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
+    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @Test
+  public void shouldEmitSomethingIfTickTupleIsReceived() {
+    // given
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    bolt.execute(tickTuple, collector);
+
+    // then
+    // verifyZeroInteractions(collector);
+    verify(collector).emit(any(Values.class));
+  }
+
+  @Test
+  public void shouldEmitNothingIfNormalTupleIsReceived() {
+    // given
+    Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    bolt.execute(normalTuple, collector);
+
+    // then
+    verifyZeroInteractions(collector);
+  }
+
+  @Test
+  public void shouldDeclareOutputFields() {
+    // given
+    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    bolt.declareOutputFields(declarer);
+
+    // then
+    verify(declarer, times(1)).declare(any(Fields.class));
+  }
+
+  @Test
+  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+    // given
+    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
+
+    // when
+    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+    // then
+    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/bolt/RollingCountBoltTest.java b/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
index ae19041..c9a516a 100644
--- a/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
+++ b/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
@@ -1,18 +1,5 @@
 package storm.starter.bolt;
 
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
-import org.testng.annotations.Test;
-
-import storm.starter.tools.MockTupleHelpers;
 import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
@@ -20,82 +7,90 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import org.testng.annotations.Test;
+import storm.starter.tools.MockTupleHelpers;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
 public class RollingCountBoltTest {
 
-    private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
-    private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-
-    private Tuple mockNormalTuple(Object obj) {
-        Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
-        when(tuple.getValue(0)).thenReturn(obj);
-        return tuple;
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
-        // given
-        Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-        RollingCountBolt bolt = new RollingCountBolt();
-        Map conf = mock(Map.class);
-        TopologyContext context = mock(TopologyContext.class);
-        OutputCollector collector = mock(OutputCollector.class);
-        bolt.prepare(conf, context, collector);
-
-        // when
-        bolt.execute(tickTuple);
-
-        // then
-        verifyZeroInteractions(collector);
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
-        // given
-        Tuple normalTuple = mockNormalTuple(new Object());
-        Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-
-        RollingCountBolt bolt = new RollingCountBolt();
-        Map conf = mock(Map.class);
-        TopologyContext context = mock(TopologyContext.class);
-        OutputCollector collector = mock(OutputCollector.class);
-        bolt.prepare(conf, context, collector);
-
-        // when
-        bolt.execute(normalTuple);
-        bolt.execute(tickTuple);
-
-        // then
-        verify(collector).emit(any(Values.class));
-    }
-
-    @Test
-    public void shouldDeclareOutputFields() {
-        // given
-        OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
-        RollingCountBolt bolt = new RollingCountBolt();
-
-        // when
-        bolt.declareOutputFields(declarer);
-
-        // then
-        verify(declarer, times(1)).declare(any(Fields.class));
-
-    }
-
-    @Test
-    public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
-        // given
-        RollingCountBolt bolt = new RollingCountBolt();
-
-        // when
-        Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
-        // then
-        assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-        Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-        assertThat(emitFrequencyInSeconds).isGreaterThan(0);
-    }
+  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
+  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
+
+  private Tuple mockNormalTuple(Object obj) {
+    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
+    when(tuple.getValue(0)).thenReturn(obj);
+    return tuple;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
+    // given
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+    RollingCountBolt bolt = new RollingCountBolt();
+    Map conf = mock(Map.class);
+    TopologyContext context = mock(TopologyContext.class);
+    OutputCollector collector = mock(OutputCollector.class);
+    bolt.prepare(conf, context, collector);
+
+    // when
+    bolt.execute(tickTuple);
+
+    // then
+    verifyZeroInteractions(collector);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
+    // given
+    Tuple normalTuple = mockNormalTuple(new Object());
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+
+    RollingCountBolt bolt = new RollingCountBolt();
+    Map conf = mock(Map.class);
+    TopologyContext context = mock(TopologyContext.class);
+    OutputCollector collector = mock(OutputCollector.class);
+    bolt.prepare(conf, context, collector);
+
+    // when
+    bolt.execute(normalTuple);
+    bolt.execute(tickTuple);
+
+    // then
+    verify(collector).emit(any(Values.class));
+  }
+
+  @Test
+  public void shouldDeclareOutputFields() {
+    // given
+    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+    RollingCountBolt bolt = new RollingCountBolt();
+
+    // when
+    bolt.declareOutputFields(declarer);
+
+    // then
+    verify(declarer, times(1)).declare(any(Fields.class));
+
+  }
+
+  @Test
+  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+    // given
+    RollingCountBolt bolt = new RollingCountBolt();
+
+    // when
+    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+    // then
+    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java b/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
index 69f5319..a8bed45 100644
--- a/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
+++ b/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
@@ -1,135 +1,130 @@
 package storm.starter.bolt;
 
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import storm.starter.tools.MockTupleHelpers;
-import storm.starter.tools.Rankings;
 import backtype.storm.Config;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import storm.starter.tools.MockTupleHelpers;
+import storm.starter.tools.Rankings;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
 public class TotalRankingsBoltTest {
 
-    private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
-    private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-    private static final Object ANY_OBJECT = new Object();
-    private static final int ANY_TOPN = 10;
-    private static final long ANY_COUNT = 42;
-
-    private Tuple mockRankingsTuple(Object obj, long count) {
-        Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
-        Rankings rankings = mock(Rankings.class);
-        when(tuple.getValue(0)).thenReturn(rankings);
-        return tuple;
-    }
-
-    @DataProvider
-    public Object[][] illegalTopN() {
-        return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
-    public void negativeOrZeroTopNShouldThrowIAE(int topN) {
-        new TotalRankingsBolt(topN);
-    }
-
-    @DataProvider
-    public Object[][] illegalEmitFrequency() {
-        return new Object[][] { { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
-    public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
-        new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-    }
-
-    @DataProvider
-    public Object[][] legalTopN() {
-        return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
-    }
-
-    @Test(dataProvider = "legalTopN")
-    public void positiveTopNShouldBeOk(int topN) {
-        new TotalRankingsBolt(topN);
-    }
-
-    @DataProvider
-    public Object[][] legalEmitFrequency() {
-        return new Object[][] { { 1 }, { 2 }, { 3 }, { 20 } };
-    }
-
-    @Test(dataProvider = "legalEmitFrequency")
-    public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
-        new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-    }
-
-    @Test
-    public void shouldEmitSomethingIfTickTupleIsReceived() {
-        // given
-        Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-        BasicOutputCollector collector = mock(BasicOutputCollector.class);
-        TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-        // when
-        bolt.execute(tickTuple, collector);
-
-        // then
-        // verifyZeroInteractions(collector);
-        verify(collector).emit(any(Values.class));
-    }
-
-    @Test
-    public void shouldEmitNothingIfNormalTupleIsReceived() {
-        // given
-        Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
-        BasicOutputCollector collector = mock(BasicOutputCollector.class);
-        TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-        // when
-        bolt.execute(normalTuple, collector);
-
-        // then
-        verifyZeroInteractions(collector);
-    }
-
-    @Test
-    public void shouldDeclareOutputFields() {
-        // given
-        OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
-        TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-        // when
-        bolt.declareOutputFields(declarer);
-
-        // then
-        verify(declarer, times(1)).declare(any(Fields.class));
-    }
-
-    @Test
-    public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
-        // given
-        TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-        // when
-        Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
-        // then
-        assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-        Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-        assertThat(emitFrequencyInSeconds).isGreaterThan(0);
-    }
+  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
+  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
+  private static final Object ANY_OBJECT = new Object();
+  private static final int ANY_TOPN = 10;
+  private static final long ANY_COUNT = 42;
+
+  private Tuple mockRankingsTuple(Object obj, long count) {
+    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
+    Rankings rankings = mock(Rankings.class);
+    when(tuple.getValue(0)).thenReturn(rankings);
+    return tuple;
+  }
+
+  @DataProvider
+  public Object[][] illegalTopN() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
+  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
+    new TotalRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] illegalEmitFrequency() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
+  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
+    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @DataProvider
+  public Object[][] legalTopN() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalTopN")
+  public void positiveTopNShouldBeOk(int topN) {
+    new TotalRankingsBolt(topN);
+  }
+
+  @DataProvider
+  public Object[][] legalEmitFrequency() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalEmitFrequency")
+  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
+    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
+  }
+
+  @Test
+  public void shouldEmitSomethingIfTickTupleIsReceived() {
+    // given
+    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    bolt.execute(tickTuple, collector);
+
+    // then
+    // verifyZeroInteractions(collector);
+    verify(collector).emit(any(Values.class));
+  }
+
+  @Test
+  public void shouldEmitNothingIfNormalTupleIsReceived() {
+    // given
+    Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
+    BasicOutputCollector collector = mock(BasicOutputCollector.class);
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    bolt.execute(normalTuple, collector);
+
+    // then
+    verifyZeroInteractions(collector);
+  }
+
+  @Test
+  public void shouldDeclareOutputFields() {
+    // given
+    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    bolt.declareOutputFields(declarer);
+
+    // then
+    verify(declarer, times(1)).declare(any(Fields.class));
+  }
+
+  @Test
+  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
+    // given
+    TotalRankingsBolt bolt = new TotalRankingsBolt();
+
+    // when
+    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
+
+    // then
+    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/tools/MockTupleHelpers.java b/test/jvm/storm/starter/tools/MockTupleHelpers.java
index d0a9e41..fd7d921 100644
--- a/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -1,23 +1,23 @@
 package storm.starter.tools;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import backtype.storm.Constants;
 import backtype.storm.tuple.Tuple;
 
+import static org.mockito.Mockito.*;
+
 public final class MockTupleHelpers {
 
-    private MockTupleHelpers() {
-    }
+  private MockTupleHelpers() {
+  }
 
-    public static Tuple mockTickTuple() {
-        return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
-    }
+  public static Tuple mockTickTuple() {
+    return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+  }
 
-    public static Tuple mockTuple(String componentId, String streamId) {
-        Tuple tuple = mock(Tuple.class);
-        when(tuple.getSourceComponent()).thenReturn(componentId);
-        when(tuple.getSourceStreamId()).thenReturn(streamId);
-        return tuple;
-    }
+  public static Tuple mockTuple(String componentId, String streamId) {
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getSourceComponent()).thenReturn(componentId);
+    when(tuple.getSourceStreamId()).thenReturn(streamId);
+    return tuple;
+  }
 }