You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:07 UTC

[29/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
index 1570aeb..aaf6875 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
@@ -36,103 +36,103 @@ import backtype.storm.tuple.Values;
 
 public class TestEventLogSpout extends BaseRichSpout {
     public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
-    
+
     private static final Map<String, Integer> acked = new HashMap<String, Integer>();
     private static final Map<String, Integer> failed = new HashMap<String, Integer>();
-    
+
     private String uid;
     private long totalCount;
-    
+
     SpoutOutputCollector _collector;
     private long eventId = 0;
     private long myCount;
     private int source;
-    
+
     public static int getNumAcked(String stormId) {
-        synchronized(acked) {
+        synchronized (acked) {
             return get(acked, stormId, 0);
         }
     }
 
     public static int getNumFailed(String stormId) {
-        synchronized(failed) {
+        synchronized (failed) {
             return get(failed, stormId, 0);
         }
     }
-    
+
     public TestEventLogSpout(long totalCount) {
         this.uid = UUID.randomUUID().toString();
-        
-        synchronized(acked) {
+
+        synchronized (acked) {
             acked.put(uid, 0);
         }
-        synchronized(failed) {
+        synchronized (failed) {
             failed.put(uid, 0);
         }
-        
+
         this.totalCount = totalCount;
     }
-        
+
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
         this.source = context.getThisTaskId();
         long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
         myCount = totalCount / taskCount;
     }
-    
+
     public void close() {
-        
+
     }
-    
+
     public void cleanup() {
-        synchronized(acked) {            
+        synchronized (acked) {
             acked.remove(uid);
-        } 
-        synchronized(failed) {            
+        }
+        synchronized (failed) {
             failed.remove(uid);
         }
     }
-    
+
     public boolean completed() {
-        
+
         int ackedAmt;
         int failedAmt;
-        
-        synchronized(acked) {
+
+        synchronized (acked) {
             ackedAmt = acked.get(uid);
         }
-        synchronized(failed) {
+        synchronized (failed) {
             failedAmt = failed.get(uid);
         }
         int totalEmitted = ackedAmt + failedAmt;
-        
+
         if (totalEmitted >= totalCount) {
             return true;
         }
         return false;
     }
-        
+
     public void nextTuple() {
-        if (eventId < myCount) { 
+        if (eventId < myCount) {
             eventId++;
             _collector.emit(new Values(source, eventId), eventId);
-        }        
+        }
     }
-    
+
     public void ack(Object msgId) {
-        synchronized(acked) {
+        synchronized (acked) {
             int curr = get(acked, uid, 0);
-            acked.put(uid, curr+1);
+            acked.put(uid, curr + 1);
         }
     }
 
     public void fail(Object msgId) {
-        synchronized(failed) {
+        synchronized (failed) {
             int curr = get(failed, uid, 0);
-            failed.put(uid, curr+1);
+            failed.put(uid, curr + 1);
         }
     }
-    
+
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("source", "eventId"));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
index 1f80362..8286d0b 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
@@ -36,7 +36,7 @@ import backtype.storm.tuple.Values;
 
 public class TestEventOrderCheckBolt extends BaseRichBolt {
     public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class);
-    
+
     private int _count;
     OutputCollector _collector;
     Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
@@ -52,8 +52,9 @@ public class TestEventOrderCheckBolt extends BaseRichBolt {
         Long recentEvent = recentEventId.get(sourceId);
 
         if (null != recentEvent && eventId <= recentEvent) {
-            String error = "Error: event id is not in strict order! event source Id: "
-                    + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId;
+            String error =
+                    "Error: event id is not in strict order! event source Id: " + sourceId + ", last event Id: " + recentEvent + ", current event Id: "
+                            + eventId;
 
             _collector.emit(input, new Values(error));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java b/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
index 5ef464a..45f48e4 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TestGlobalCount extends BaseRichBolt {
     public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java b/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
index d41c36a..099a8db 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
@@ -20,10 +20,7 @@ package backtype.storm.testing;
 import backtype.storm.ILocalCluster;
 
 /**
- * This is the core interface for the storm java testing, usually
- * we put our java unit testing logic in the run method. A sample
- * code will be:
- * <code>
+ * This is the core interface for the storm java testing, usually we put our java unit testing logic in the run method. A sample code will be: <code>
  * Testing.withSimulatedTimeLocalCluster(new TestJob() {
  *     public void run(Cluster cluster) {
  *         // your testing logic here.
@@ -31,11 +28,10 @@ import backtype.storm.ILocalCluster;
  * });
  */
 public interface TestJob {
-	/**
-	 * run the testing logic with the cluster.
-	 * 
-	 * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code>
-	 *        and <code>Testing.withTrackedCluster</code>.
-	 */
+    /**
+     * run the testing logic with the cluster.
+     * 
+     * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code> and <code>Testing.withTrackedCluster</code>.
+     */
     public void run(ILocalCluster cluster) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
index 0d30b26..769f1cf 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
@@ -25,16 +25,15 @@ import java.util.Map;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
 
-
 public class TestPlannerBolt extends BaseRichBolt {
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 
     }
-    
+
     public void execute(Tuple input) {
 
     }
-        
+
     public Fields getOutputFields() {
         return new Fields("field1", "field2");
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
index f4c27c0..bcacd4d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
@@ -27,11 +27,10 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 import java.util.HashMap;
 
-
 public class TestPlannerSpout extends BaseRichSpout {
     boolean _isDistributed;
     Fields _outFields;
-    
+
     public TestPlannerSpout(Fields outFields, boolean isDistributed) {
         _isDistributed = isDistributed;
         _outFields = outFields;
@@ -40,34 +39,33 @@ public class TestPlannerSpout extends BaseRichSpout {
     public TestPlannerSpout(boolean isDistributed) {
         this(new Fields("field1", "field2"), isDistributed);
     }
-        
+
     public TestPlannerSpout(Fields outFields) {
         this(outFields, true);
     }
-    
+
     public Fields getOutputFields() {
         return _outFields;
     }
 
-    
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        
+
     }
-    
+
     public void close() {
-        
+
     }
-    
+
     public void nextTuple() {
         Utils.sleep(100);
     }
-    
-    public void ack(Object msgId){
-        
+
+    public void ack(Object msgId) {
+
     }
 
-    public void fail(Object msgId){
-        
+    public void fail(Object msgId) {
+
     }
 
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -77,9 +75,9 @@ public class TestPlannerSpout extends BaseRichSpout {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
-        if(!_isDistributed) {
+        if (!_isDistributed) {
             ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
         }
         return ret;
-    }       
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java b/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
index 1c7706f..5416ef0 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
@@ -22,12 +22,12 @@ import java.io.Serializable;
 public class TestSerObject implements Serializable {
     public int f1;
     public int f2;
-    
+
     public TestSerObject(int f1, int f2) {
         this.f1 = f1;
         this.f2 = f2;
     }
-    
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -36,7 +36,7 @@ public class TestSerObject implements Serializable {
         result = prime * result + f2;
         return result;
     }
-    
+
     @Override
     public boolean equals(Object obj) {
         if (this == obj)
@@ -52,5 +52,5 @@ public class TestSerObject implements Serializable {
             return false;
         return true;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java b/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
index 551b054..c6b32b5 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
@@ -29,29 +29,28 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static backtype.storm.utils.Utils.tuple;
 
-
 public class TestWordCounter extends BaseBasicBolt {
     public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
 
     Map<String, Integer> _counts;
-    
+
     public void prepare(Map stormConf, TopologyContext context) {
         _counts = new HashMap<String, Integer>();
     }
-    
+
     public void execute(Tuple input, BasicOutputCollector collector) {
         String word = (String) input.getValues().get(0);
         int count = 0;
-        if(_counts.containsKey(word)) {
+        if (_counts.containsKey(word)) {
             count = _counts.get(word);
         }
         count++;
         _counts.put(word, count);
         collector.emit(tuple(word, count));
     }
-    
+
     public void cleanup() {
-        
+
     }
 
     public void declareOutputFields(OutputFieldsDeclarer declarer) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
index 745bf71..d5603a1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
@@ -31,7 +31,6 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TestWordSpout extends BaseRichSpout {
     public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
     boolean _isDistributed;
@@ -44,43 +43,43 @@ public class TestWordSpout extends BaseRichSpout {
     public TestWordSpout(boolean isDistributed) {
         _isDistributed = isDistributed;
     }
-        
+
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
     }
-    
+
     public void close() {
-        
+
     }
-        
+
     public void nextTuple() {
         Utils.sleep(100);
-        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
+        final String[] words = new String[] { "nathan", "mike", "jackson", "golda", "bertels" };
         final Random rand = new Random();
         final String word = words[rand.nextInt(words.length)];
         _collector.emit(new Values(word));
     }
-    
+
     public void ack(Object msgId) {
 
     }
 
     public void fail(Object msgId) {
-        
+
     }
-    
+
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word"));
     }
 
     @Override
     public Map<String, Object> getComponentConfiguration() {
-        if(!_isDistributed) {
+        if (!_isDistributed) {
             Map<String, Object> ret = new HashMap<String, Object>();
             ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
             return ret;
         } else {
             return null;
         }
-    }    
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java b/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
index f2691b7..60506b5 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
@@ -23,12 +23,12 @@ import java.util.Map;
 import backtype.storm.generated.StormTopology;
 import clojure.lang.Keyword;
 
-public class TrackedTopology extends HashMap{
-	public TrackedTopology(Map map) {
-		super(map);
-	}
-	
-	public StormTopology getTopology() {
-		return (StormTopology)get(Keyword.intern("topology"));
-	}
+public class TrackedTopology extends HashMap {
+    public TrackedTopology(Map map) {
+        super(map);
+    }
+
+    public StormTopology getTopology() {
+        return (StormTopology) get(Keyword.intern("topology"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
index e163576..9635887 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-
 public class TupleCaptureBolt implements IRichBolt {
     public static transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<String, Map<String, List<FixedTuple>>>();
 
@@ -47,8 +46,8 @@ public class TupleCaptureBolt implements IRichBolt {
     public void execute(Tuple input) {
         String component = input.getSourceComponent();
         Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name);
-        if(!captured.containsKey(component)) {
-           captured.put(component, new ArrayList<FixedTuple>());
+        if (!captured.containsKey(component)) {
+            captured.put(component, new ArrayList<FixedTuple>());
         }
         captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
         _collector.ack(input);
@@ -60,7 +59,7 @@ public class TupleCaptureBolt implements IRichBolt {
 
     public void cleanup() {
     }
-    
+
     public Map<String, List<FixedTuple>> getAndRemoveResults() {
         return emitted_tuples.remove(_name);
     }
@@ -70,7 +69,7 @@ public class TupleCaptureBolt implements IRichBolt {
         emitted_tuples.get(_name).clear();
         return ret;
     }
-    
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
index 0c67324..a6614fc 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
@@ -36,19 +36,22 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration
 
     @Override
     public T setMaxTaskParallelism(Number val) {
-        if(val!=null) val = val.intValue();
+        if (val != null)
+            val = val.intValue();
         return addConfiguration(Config.TOPOLOGY_MAX_TASK_PARALLELISM, val);
     }
 
     @Override
     public T setMaxSpoutPending(Number val) {
-        if(val!=null) val = val.intValue();
+        if (val != null)
+            val = val.intValue();
         return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val);
     }
-    
+
     @Override
     public T setNumTasks(Number val) {
-        if(val!=null) val = val.intValue();
+        if (val != null)
+            val = val.intValue();
         return addConfiguration(Config.TOPOLOGY_TASKS, val);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
index 6c9cdc1..ea437c5 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
@@ -25,11 +25,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BasicBoltExecutor implements IRichBolt {
-    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
-    
+    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
+
     private IBasicBolt _bolt;
     private transient BasicOutputCollector _collector;
-    
+
     public BasicBoltExecutor(IBasicBolt bolt) {
         _bolt = bolt;
     }
@@ -38,7 +38,6 @@ public class BasicBoltExecutor implements IRichBolt {
         _bolt.declareOutputFields(declarer);
     }
 
-    
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         _bolt.prepare(stormConf, context);
         _collector = new BasicOutputCollector(collector);
@@ -49,8 +48,8 @@ public class BasicBoltExecutor implements IRichBolt {
         try {
             _bolt.execute(input, _collector);
             _collector.getOutputter().ack(input);
-        } catch(FailedException e) {
-            if(e instanceof ReportedFailedException) {
+        } catch (FailedException e) {
+            if (e instanceof ReportedFailedException) {
                 _collector.reportError(e);
             }
             _collector.getOutputter().fail(input);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
index be1c242..e48f159 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
@@ -23,7 +23,6 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.Utils;
 import java.util.List;
 
-
 public class BasicOutputCollector implements IBasicOutputCollector {
     private OutputCollector out;
     private Tuple inputTuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
index 0c4b200..8fe05e2 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
@@ -19,8 +19,9 @@ package backtype.storm.topology;
 
 /**
  * BoltDeclarer includes grouping APIs for storm topology.
+ * 
  * @see <a href="https://storm.apache.org/documentation/Concepts.html">Concepts -Stream groupings-</a>
  */
 public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>, ComponentConfigurationDeclarer<BoltDeclarer> {
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
index d05dda0..49d78e5 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
@@ -21,9 +21,14 @@ import java.util.Map;
 
 public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
     T addConfigurations(Map conf);
+
     T addConfiguration(String config, Object value);
+
     T setDebug(boolean debug);
+
     T setMaxTaskParallelism(Number val);
+
     T setMaxSpoutPending(Number val);
+
     T setNumTasks(Number val);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java b/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
index e174b5a..6c26bbf 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
@@ -21,11 +21,11 @@ public class FailedException extends RuntimeException {
     public FailedException() {
         super();
     }
-    
+
     public FailedException(String msg) {
         super(msg);
     }
-    
+
     public FailedException(String msg, Throwable cause) {
         super(msg, cause);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
index 3b24f4e..81741df 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
@@ -23,11 +23,13 @@ import java.util.Map;
 
 public interface IBasicBolt extends IComponent {
     void prepare(Map stormConf, TopologyContext context);
+
     /**
      * Process the input tuple and optionally emit new tuples based on the input tuple.
      * 
      * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
      */
     void execute(Tuple input, BasicOutputCollector collector);
+
     void cleanup();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
index 92d60d2..85008c2 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
@@ -21,6 +21,8 @@ import java.util.List;
 
 public interface IBasicOutputCollector {
     List<Integer> emit(String streamId, List<Object> tuple);
+
     void emitDirect(int taskId, String streamId, List<Object> tuple);
+
     void reportError(Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
index 560c96f..1d0865d 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
@@ -21,23 +21,21 @@ import java.io.Serializable;
 import java.util.Map;
 
 /**
- * Common methods for all possible components in a topology. This interface is used
- * when defining topologies using the Java API. 
+ * Common methods for all possible components in a topology. This interface is used when defining topologies using the Java API.
  */
 public interface IComponent extends Serializable {
 
     /**
      * Declare the output schema for all the streams of this topology.
-     *
+     * 
      * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
      */
     void declareOutputFields(OutputFieldsDeclarer declarer);
 
     /**
-     * Declare configuration specific to this component. Only a subset of the "topology.*" configs can
-     * be overridden. The component configuration can be further overridden when constructing the 
-     * topology using {@link TopologyBuilder}
-     *
+     * Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component configuration can be further
+     * overridden when constructing the topology using {@link TopologyBuilder}
+     * 
      */
     Map<String, Object> getComponentConfiguration();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java b/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java
deleted file mode 100644
index 3ce9da7..0000000
--- a/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.topology;
-
-import java.util.Map;
-
-/*
- * This interface is used to notify the update of user configuration
- * for bolt and spout 
- */
-public interface IConfig {
-    public void updateConf(Map conf);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java
new file mode 100644
index 0000000..573ca99
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java
@@ -0,0 +1,13 @@
+package backtype.storm.topology;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/*
+ * This interface is used to notify the update of user configuration
+ * for bolt and spout 
+ */
+
+public interface IDynamicComponent extends Serializable {
+    public void update(Map conf);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
index d35244e..d44619c 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
@@ -20,9 +20,8 @@ package backtype.storm.topology;
 import backtype.storm.task.IBolt;
 
 /**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
- * to use to implement components of the topology.
- *
+ * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces to use to implement components of the topology.
+ * 
  */
 public interface IRichBolt extends IBolt, IComponent {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
index b088641..e1bdc02 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
@@ -20,9 +20,8 @@ package backtype.storm.topology;
 import backtype.storm.spout.ISpout;
 
 /**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
- * to use to implement components of the topology.
- *
+ * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces to use to implement components of the topology.
+ * 
  */
 public interface IRichSpout extends ISpout, IComponent {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
index edcc0ff..a22acd4 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
@@ -19,7 +19,6 @@ package backtype.storm.topology;
 
 import backtype.storm.state.IStateSpout;
 
-
 public interface IRichStateSpout extends IStateSpout, IComponent {
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
index 33540de..54f2702 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
@@ -22,10 +22,10 @@ import backtype.storm.generated.Grouping;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.tuple.Fields;
 
-
 public interface InputDeclarer<T extends InputDeclarer> {
     /**
      * The stream is partitioned by the fields specified in the grouping.
+     * 
      * @param componentId
      * @param fields
      * @return
@@ -34,6 +34,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
 
     /**
      * The stream is partitioned by the fields specified in the grouping.
+     * 
      * @param componentId
      * @param streamId
      * @param fields
@@ -42,16 +43,16 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T fieldsGrouping(String componentId, String streamId, Fields fields);
 
     /**
-     * The entire stream goes to a single one of the bolt's tasks.
-     * Specifically, it goes to the task with the lowest id.
+     * The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
+     * 
      * @param componentId
      * @return
      */
     public T globalGrouping(String componentId);
 
     /**
-     * The entire stream goes to a single one of the bolt's tasks.
-     * Specifically, it goes to the task with the lowest id.
+     * The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
+     * 
      * @param componentId
      * @param streamId
      * @return
@@ -59,16 +60,16 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T globalGrouping(String componentId, String streamId);
 
     /**
-     * Tuples are randomly distributed across the bolt's tasks in a way such that
-     * each bolt is guaranteed to get an equal number of tuples.
+     * Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
+     * 
      * @param componentId
      * @return
      */
     public T shuffleGrouping(String componentId);
 
     /**
-     * Tuples are randomly distributed across the bolt's tasks in a way such that
-     * each bolt is guaranteed to get an equal number of tuples.
+     * Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
+     * 
      * @param componentId
      * @param streamId
      * @return
@@ -76,29 +77,31 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T shuffleGrouping(String componentId, String streamId);
 
     /**
-     * If the target bolt has one or more tasks in the same worker process,
-     * tuples will be shuffled to just those in-process tasks.
-     * Otherwise, this acts like a normal shuffle grouping.
+     * If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a
+     * normal shuffle grouping.
+     * 
      * @param componentId
      * @return
      */
     public T localOrShuffleGrouping(String componentId);
 
     /**
-     * If the target bolt has one or more tasks in the same worker process,
-     * tuples will be shuffled to just those in-process tasks.
-     * Otherwise, this acts like a normal shuffle grouping.
+     * If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a
+     * normal shuffle grouping.
+     * 
      * @param componentId
      * @param streamId
      * @return
      */
     public T localOrShuffleGrouping(String componentId, String streamId);
-    
+
     public T localFirstGrouping(String componentId);
-    
+
     public T localFirstGrouping(String componentId, String streamId);
+
     /**
      * This grouping specifies that you don't care how the stream is grouped.
+     * 
      * @param componentId
      * @return
      */
@@ -106,6 +109,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
 
     /**
      * This grouping specifies that you don't care how the stream is grouped.
+     * 
      * @param componentId
      * @param streamId
      * @return
@@ -114,6 +118,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
 
     /**
      * The stream is replicated across all the bolt's tasks. Use this grouping with care.
+     * 
      * @param componentId
      * @return
      */
@@ -121,6 +126,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
 
     /**
      * The stream is replicated across all the bolt's tasks. Use this grouping with care.
+     * 
      * @param componentId
      * @param streamId
      * @return
@@ -128,16 +134,16 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T allGrouping(String componentId, String streamId);
 
     /**
-     * A stream grouped this way means that the producer of the tuple decides
-     * which task of the consumer will receive this tuple.
+     * A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple.
+     * 
      * @param componentId
      * @return
      */
     public T directGrouping(String componentId);
 
     /**
-     * A stream grouped this way means that the producer of the tuple decides
-     * which task of the consumer will receive this tuple.
+     * A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple.
+     * 
      * @param componentId
      * @param streamId
      * @return
@@ -145,9 +151,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T directGrouping(String componentId, String streamId);
 
     /**
-     * Tuples are passed to two hashing functions and each target task is
-     * decided based on the comparison of the state of candidate nodes.
-     * @see   https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
+     * Tuples are passed to two hashing functions and each target task is decided based on the comparison of the state of candidate nodes.
+     * 
+     * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
      * @param componentId
      * @param fields
      * @return
@@ -155,9 +161,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T partialKeyGrouping(String componentId, Fields fields);
 
     /**
-     * Tuples are passed to two hashing functions and each target task is
-     * decided based on the comparison of the state of candidate nodes.
-     * @see   https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
+     * Tuples are passed to two hashing functions and each target task is decided based on the comparison of the state of candidate nodes.
+     * 
+     * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
      * @param componentId
      * @param streamId
      * @param fields
@@ -167,6 +173,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
 
     /**
      * A custom stream grouping by implementing the CustomStreamGrouping interface.
+     * 
      * @param componentId
      * @param grouping
      * @return
@@ -175,13 +182,14 @@ public interface InputDeclarer<T extends InputDeclarer> {
 
     /**
      * A custom stream grouping by implementing the CustomStreamGrouping interface.
+     * 
      * @param componentId
      * @param streamId
      * @param grouping
      * @return
      */
     public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);
-    
+
     public T grouping(GlobalStreamId id, Grouping grouping);
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
index 2ac4794..d5ca7ca 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
@@ -19,14 +19,15 @@ package backtype.storm.topology;
 
 import backtype.storm.tuple.Fields;
 
-
 public interface OutputFieldsDeclarer {
     /**
      * Uses default stream id.
      */
     public void declare(Fields fields);
+
     public void declare(boolean direct, Fields fields);
-    
+
     public void declareStream(String streamId, Fields fields);
+
     public void declareStream(String streamId, boolean direct, Fields fields);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
index 0e7fd59..1fdcf86 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
@@ -39,13 +39,12 @@ public class OutputFieldsGetter implements OutputFieldsDeclarer {
     }
 
     public void declareStream(String streamId, boolean direct, Fields fields) {
-        if(_fields.containsKey(streamId)) {
+        if (_fields.containsKey(streamId)) {
             throw new IllegalArgumentException("Fields for " + streamId + " already set");
         }
         _fields.put(streamId, new StreamInfo(fields.toList(), direct));
     }
 
-
     public Map<String, StreamInfo> getFieldsDeclaration() {
         return _fields;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java b/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
index 4e4ebe4..c90a545 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
@@ -21,11 +21,11 @@ public class ReportedFailedException extends FailedException {
     public ReportedFailedException() {
         super();
     }
-    
+
     public ReportedFailedException(String msg) {
         super(msg);
     }
-    
+
     public ReportedFailedException(String msg, Throwable cause) {
         super(msg, cause);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
index c0d8254..9c5ec34 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
@@ -18,5 +18,5 @@
 package backtype.storm.topology;
 
 public interface SpoutDeclarer extends ComponentConfigurationDeclarer<SpoutDeclarer> {
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
index c04e449..2b546e3 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
@@ -18,108 +18,90 @@
 package backtype.storm.topology;
 
 import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.NullStruct;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.grouping.PartialKeyGrouping;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+import org.json.simple.JSONValue;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
-import org.json.simple.JSONValue;
 
 /**
- * TopologyBuilder exposes the Java API for specifying a topology for Storm
- * to execute. Topologies are Thrift structures in the end, but since the Thrift API
- * is so verbose, TopologyBuilder greatly eases the process of creating topologies.
- * The template for creating and submitting a topology looks something like:
- *
+ * TopologyBuilder exposes the Java API for specifying a topology for Storm to execute. Topologies are Thrift structures in the end, but since the Thrift API is
+ * so verbose, TopologyBuilder greatly eases the process of creating topologies. The template for creating and submitting a topology looks something like:
+ * 
  * <pre>
  * TopologyBuilder builder = new TopologyBuilder();
- *
- * builder.setSpout("1", new TestWordSpout(true), 5);
- * builder.setSpout("2", new TestWordSpout(true), 3);
- * builder.setBolt("3", new TestWordCounter(), 3)
- *          .fieldsGrouping("1", new Fields("word"))
- *          .fieldsGrouping("2", new Fields("word"));
- * builder.setBolt("4", new TestGlobalCount())
- *          .globalGrouping("1");
- *
+ * 
+ * builder.setSpout(&quot;1&quot;, new TestWordSpout(true), 5);
+ * builder.setSpout(&quot;2&quot;, new TestWordSpout(true), 3);
+ * builder.setBolt(&quot;3&quot;, new TestWordCounter(), 3).fieldsGrouping(&quot;1&quot;, new Fields(&quot;word&quot;)).fieldsGrouping(&quot;2&quot;, new Fields(&quot;word&quot;));
+ * builder.setBolt(&quot;4&quot;, new TestGlobalCount()).globalGrouping(&quot;1&quot;);
+ * 
  * Map conf = new HashMap();
  * conf.put(Config.TOPOLOGY_WORKERS, 4);
  * 
- * StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
+ * StormSubmitter.submitTopology(&quot;mytopology&quot;, conf, builder.createTopology());
  * </pre>
- *
- * Running the exact same topology in local mode (in process), and configuring it to log all tuples
- * emitted, looks like the following. Note that it lets the topology run for 10 seconds
- * before shutting down the local cluster.
- *
+ * 
+ * Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the
+ * topology run for 10 seconds before shutting down the local cluster.
+ * 
  * <pre>
  * TopologyBuilder builder = new TopologyBuilder();
- *
- * builder.setSpout("1", new TestWordSpout(true), 5);
- * builder.setSpout("2", new TestWordSpout(true), 3);
- * builder.setBolt("3", new TestWordCounter(), 3)
- *          .fieldsGrouping("1", new Fields("word"))
- *          .fieldsGrouping("2", new Fields("word"));
- * builder.setBolt("4", new TestGlobalCount())
- *          .globalGrouping("1");
- *
+ * 
+ * builder.setSpout(&quot;1&quot;, new TestWordSpout(true), 5);
+ * builder.setSpout(&quot;2&quot;, new TestWordSpout(true), 3);
+ * builder.setBolt(&quot;3&quot;, new TestWordCounter(), 3).fieldsGrouping(&quot;1&quot;, new Fields(&quot;word&quot;)).fieldsGrouping(&quot;2&quot;, new Fields(&quot;word&quot;));
+ * builder.setBolt(&quot;4&quot;, new TestGlobalCount()).globalGrouping(&quot;1&quot;);
+ * 
  * Map conf = new HashMap();
  * conf.put(Config.TOPOLOGY_WORKERS, 4);
  * conf.put(Config.TOPOLOGY_DEBUG, true);
- *
+ * 
  * LocalCluster cluster = new LocalCluster();
- * cluster.submitTopology("mytopology", conf, builder.createTopology());
+ * cluster.submitTopology(&quot;mytopology&quot;, conf, builder.createTopology());
  * Utils.sleep(10000);
  * cluster.shutdown();
  * </pre>
- *
- * <p>The pattern for TopologyBuilder is to map component ids to components using the setSpout
- * and setBolt methods. Those methods return objects that are then used to declare
- * the inputs for that component.</p>
+ * 
+ * <p>
+ * The pattern for TopologyBuilder is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used
+ * to declare the inputs for that component.
+ * </p>
  */
 public class TopologyBuilder {
     private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
     private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
     private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
 
-//    private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
+    // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
 
     private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
-    
-    
+
     public StormTopology createTopology() {
         Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
         Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
-        for(String boltId: _bolts.keySet()) {
+        for (String boltId : _bolts.keySet()) {
             IRichBolt bolt = _bolts.get(boltId);
             ComponentCommon common = getComponentCommon(boltId, bolt);
             boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
         }
-        for(String spoutId: _spouts.keySet()) {
+        for (String spoutId : _spouts.keySet()) {
             IRichSpout spout = _spouts.get(spoutId);
             ComponentCommon common = getComponentCommon(spoutId, spout);
             spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
-            
+
         }
-        return new StormTopology(spoutSpecs,
-                                 boltSpecs,
-                                 new HashMap<String, StateSpoutSpec>());
+        return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
     }
 
     /**
      * Define a new bolt in this topology with parallelism of just one thread.
-     *
+     * 
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the bolt
      * @return use the returned object to declare the inputs to this component
@@ -130,10 +112,11 @@ public class TopologyBuilder {
 
     /**
      * Define a new bolt in this topology with the specified amount of parallelism.
-     *
+     * 
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the bolt
-     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
+     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around
+     *            the cluster.
      * @return use the returned object to declare the inputs to this component
      */
     public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
@@ -144,11 +127,9 @@ public class TopologyBuilder {
     }
 
     /**
-     * Define a new bolt in this topology. This defines a basic bolt, which is a
-     * simpler to use but more restricted kind of bolt. Basic bolts are intended
-     * for non-aggregation processing and automate the anchoring/acking process to
-     * achieve proper reliability in the topology.
-     *
+     * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for
+     * non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.
+     * 
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the basic bolt
      * @return use the returned object to declare the inputs to this component
@@ -158,14 +139,13 @@ public class TopologyBuilder {
     }
 
     /**
-     * Define a new bolt in this topology. This defines a basic bolt, which is a
-     * simpler to use but more restricted kind of bolt. Basic bolts are intended
-     * for non-aggregation processing and automate the anchoring/acking process to
-     * achieve proper reliability in the topology.
-     *
+     * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for
+     * non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.
+     * 
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the basic bolt
-     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
+     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around
+     *            the cluster.
      * @return use the returned object to declare the inputs to this component
      */
     public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
@@ -174,7 +154,7 @@ public class TopologyBuilder {
 
     /**
      * Define a new spout in this topology.
-     *
+     * 
      * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
      * @param spout the spout
      */
@@ -183,12 +163,12 @@ public class TopologyBuilder {
     }
 
     /**
-     * Define a new spout in this topology with the specified parallelism. If the spout declares
-     * itself as non-distributed, the parallelism_hint will be ignored and only one task
-     * will be allocated to this component.
-     *
+     * Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored
+     * and only one task will be allocated to this component.
+     * 
      * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
-     * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster.
+     * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around
+     *            the cluster.
      * @param spout the spout
      */
     public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
@@ -207,51 +187,51 @@ public class TopologyBuilder {
         // TODO: finish
     }
 
-
     private void validateUnusedId(String id) {
-        if(_bolts.containsKey(id)) {
+        if (_bolts.containsKey(id)) {
             throw new IllegalArgumentException("Bolt has already been declared for id " + id);
         }
-        if(_spouts.containsKey(id)) {
+        if (_spouts.containsKey(id)) {
             throw new IllegalArgumentException("Spout has already been declared for id " + id);
         }
-        if(_stateSpouts.containsKey(id)) {
+        if (_stateSpouts.containsKey(id)) {
             throw new IllegalArgumentException("State spout has already been declared for id " + id);
         }
     }
 
     private ComponentCommon getComponentCommon(String id, IComponent component) {
         ComponentCommon ret = new ComponentCommon(_commons.get(id));
-        
+
         OutputFieldsGetter getter = new OutputFieldsGetter();
         component.declareOutputFields(getter);
         ret.set_streams(getter.getFieldsDeclaration());
-        return ret;        
+        return ret;
     }
-    
+
     private void initCommon(String id, IComponent component, Number parallelism) {
         ComponentCommon common = new ComponentCommon();
         common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
-        if(parallelism!=null) {
+        if (parallelism != null) {
             common.set_parallelism_hint(parallelism.intValue());
-        }else {
+        } else {
             common.set_parallelism_hint(1);
         }
         Map conf = component.getComponentConfiguration();
-        if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
+        if (conf != null)
+            common.set_json_conf(JSONValue.toJSONString(conf));
         _commons.put(id, common);
     }
 
     protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
         String _id;
-        
+
         public ConfigGetter(String id) {
             _id = id;
         }
-        
+
         @Override
         public T addConfigurations(Map conf) {
-            if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
+            if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
                 throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
             }
             String currConf = _commons.get(_id).get_json_conf();
@@ -259,13 +239,13 @@ public class TopologyBuilder {
             return (T) this;
         }
     }
-    
+
     protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
         public SpoutGetter(String id) {
             super(id);
-        }        
+        }
     }
-    
+
     protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
         private String _boltId;
 
@@ -305,17 +285,17 @@ public class TopologyBuilder {
         public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) {
             return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct()));
         }
-        
+
         @Override
         public BoltDeclarer localFirstGrouping(String componentId) {
             return localFirstGrouping(componentId, Utils.DEFAULT_STREAM_ID);
         }
-        
+
         @Override
         public BoltDeclarer localFirstGrouping(String componentId, String streamId) {
             return grouping(componentId, streamId, Grouping.localFirst(new NullStruct()));
         }
-        
+
         public BoltDeclarer noneGrouping(String componentId) {
             return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
         }
@@ -368,17 +348,20 @@ public class TopologyBuilder {
         @Override
         public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) {
             return grouping(id.get_componentId(), id.get_streamId(), grouping);
-        }        
+        }
     }
-    
+
     private static Map parseJson(String json) {
-        if(json==null) return new HashMap();
-        else return (Map) JSONValue.parse(json);
+        if (json == null)
+            return new HashMap();
+        else
+            return (Map) JSONValue.parse(json);
     }
-    
+
     private static String mergeIntoJson(Map into, Map newMap) {
         Map res = new HashMap(into);
-        if(newMap!=null) res.putAll(newMap);
+        if (newMap != null)
+            res.putAll(newMap);
         return JSONValue.toJSONString(res);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
index e585ee6..eb13e56 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
@@ -29,5 +29,5 @@ public abstract class BaseBasicBolt extends BaseComponent implements IBasicBolt
 
     @Override
     public void cleanup() {
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
index 3206941..43d21a3 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
@@ -21,5 +21,5 @@ import backtype.storm.coordination.IBatchBolt;
 import java.util.Map;
 
 public abstract class BaseBatchBolt<T> extends BaseComponent implements IBatchBolt<T> {
- 
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
index 8afcdaa..1206abc 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
@@ -24,5 +24,5 @@ public abstract class BaseComponent implements IComponent {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return null;
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
index 2d20a48..64c3887 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
@@ -19,7 +19,6 @@ package backtype.storm.topology.base;
 
 import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
 
-
 public abstract class BaseOpaquePartitionedTransactionalSpout<T> extends BaseComponent implements IOpaquePartitionedTransactionalSpout<T> {
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
index 266736e..ebf31eb 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
@@ -22,5 +22,5 @@ import backtype.storm.topology.IRichBolt;
 public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
     @Override
     public void cleanup() {
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
index 37513b7..18f1f2c 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
@@ -24,7 +24,7 @@ package backtype.storm.topology.base;
 import backtype.storm.topology.IRichSpout;
 
 /**
- *
+ * 
  * @author nathan
  */
 public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
index b6451e9..246b3a3 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
@@ -20,5 +20,5 @@ package backtype.storm.topology.base;
 import backtype.storm.transactional.TransactionAttempt;
 
 public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
index 859bad2..0e91178 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
@@ -18,9 +18,8 @@
 package backtype.storm.transactional;
 
 /**
- * This marks an IBatchBolt within a transactional topology as a committer. This causes the 
- * finishBatch method to be called in order of the transactions.
+ * This marks an IBatchBolt within a transactional topology as a committer. This causes the finishBatch method to be called in order of the transactions.
  */
 public interface ICommitter {
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
index 5441ee2..1cd448c 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
@@ -20,12 +20,11 @@ package backtype.storm.transactional;
 import backtype.storm.task.TopologyContext;
 import java.util.Map;
 
-
 public interface ICommitterTransactionalSpout<X> extends ITransactionalSpout<X> {
     public interface Emitter extends ITransactionalSpout.Emitter {
         void commit(TransactionAttempt attempt);
-    } 
-    
+    }
+
     @Override
-    public Emitter getEmitter(Map conf, TopologyContext context);    
+    public Emitter getEmitter(Map conf, TopologyContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
index 3207493..528eda7 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
@@ -26,69 +26,62 @@ import java.util.Map;
 public interface ITransactionalSpout<T> extends IComponent {
     public interface Coordinator<X> {
         /**
-         * Create metadata for this particular transaction id which has never
-         * been emitted before. The metadata should contain whatever is necessary
-         * to be able to replay the exact batch for the transaction at a later point.
+         * Create metadata for this particular transaction id which has never been emitted before. The metadata should contain whatever is necessary to be able
+         * to replay the exact batch for the transaction at a later point.
          * 
          * The metadata is stored in Zookeeper.
          * 
-         * Storm uses the Kryo serializations configured in the component configuration 
-         * for this spout to serialize and deserialize the metadata.
+         * Storm uses the Kryo serializations configured in the component configuration for this spout to serialize and deserialize the metadata.
          * 
          * @param txid The id of the transaction.
          * @param prevMetadata The metadata of the previous transaction
          * @return the metadata for this new transaction
          */
         X initializeTransaction(BigInteger txid, X prevMetadata);
-        
+
         /**
          * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
          * 
-         * You should sleep here if you want a delay between asking for the next transaction (this will be called 
-         * repeatedly in a loop).
+         * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
          */
         boolean isReady();
-        
+
         /**
          * Release any resources from this coordinator.
          */
         void close();
     }
-    
+
     public interface Emitter<X> {
         /**
-         * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata
-         * was created by the Coordinator in the initializeTranaction method. This method must always emit
-         * the same batch of tuples across all tasks for the same transaction id.
+         * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata was created by the Coordinator in the
+         * initializeTranaction method. This method must always emit the same batch of tuples across all tasks for the same transaction id.
          * 
          * The first field of all emitted tuples must contain the provided TransactionAttempt.
          * 
          */
         void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
-        
+
         /**
-         * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this
-         * method should clean up that state.
+         * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this method should clean up that state.
          */
         void cleanupBefore(BigInteger txid);
-        
+
         /**
          * Release any resources held by this emitter.
          */
         void close();
     }
-    
+
     /**
-     * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches
-     * of tuples should be emitted and when transactions should commit. The Coordinator that you provide 
-     * in a TransactionalSpout provides metadata for each transaction so that the transactions can be replayed.
+     * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches of tuples should be emitted and when transactions should
+     * commit. The Coordinator that you provide in a TransactionalSpout provides metadata for each transaction so that the transactions can be replayed.
      */
     Coordinator<T> getCoordinator(Map conf, TopologyContext context);
 
     /**
-     * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for
-     * emitting batches of tuples for a transaction and must ensure that the same batch of tuples is always
-     * emitted for the same transaction id.
-     */    
+     * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for emitting batches of tuples for a transaction and
+     * must ensure that the same batch of tuples is always emitted for the same transaction id.
+     */
     Emitter<T> getEmitter(Map conf, TopologyContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
index 80bbb0e..e64a2d7 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
@@ -22,22 +22,21 @@ import java.math.BigInteger;
 public class TransactionAttempt {
     BigInteger _txid;
     long _attemptId;
-    
-    
+
     // for kryo compatibility
     public TransactionAttempt() {
-        
+
     }
-    
+
     public TransactionAttempt(BigInteger txid, long attemptId) {
         _txid = txid;
         _attemptId = attemptId;
     }
-    
+
     public BigInteger getTransactionId() {
         return _txid;
     }
-    
+
     public long getAttemptId() {
         return _attemptId;
     }
@@ -49,7 +48,8 @@ public class TransactionAttempt {
 
     @Override
     public boolean equals(Object o) {
-        if(!(o instanceof TransactionAttempt)) return false;
+        if (!(o instanceof TransactionAttempt))
+            return false;
         TransactionAttempt other = (TransactionAttempt) o;
         return _txid.equals(other._txid) && _attemptId == other._attemptId;
     }
@@ -57,5 +57,5 @@ public class TransactionAttempt {
     @Override
     public String toString() {
         return "" + _txid + ":" + _attemptId;
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
index 53aacae..9bcd75d 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
@@ -31,18 +31,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TransactionalSpoutBatchExecutor implements IRichBolt {
-    public static Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class);    
+    public static Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class);
 
     BatchOutputCollectorImpl _collector;
     ITransactionalSpout _spout;
     ITransactionalSpout.Emitter _emitter;
-    
+
     TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<BigInteger, TransactionAttempt>();
 
     public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) {
         _spout = spout;
     }
-    
+
     @Override
     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
         _collector = new BatchOutputCollectorImpl(collector);
@@ -53,27 +53,27 @@ public class TransactionalSpoutBatchExecutor implements IRichBolt {
     public void execute(Tuple input) {
         TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
         try {
-            if(input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
-                if(attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) {
+            if (input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
+                if (attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) {
                     ((ICommitterTransactionalSpout.Emitter) _emitter).commit(attempt);
                     _activeTransactions.remove(attempt.getTransactionId());
                     _collector.ack(input);
                 } else {
                     _collector.fail(input);
                 }
-            } else { 
+            } else {
                 _emitter.emitBatch(attempt, input.getValue(1), _collector);
                 _activeTransactions.put(attempt.getTransactionId(), attempt);
                 _collector.ack(input);
                 BigInteger committed = (BigInteger) input.getValue(2);
-                if(committed!=null) {
-                    // valid to delete before what's been committed since 
+                if (committed != null) {
+                    // valid to delete before what's been committed since
                     // those batches will never be accessed again
                     _activeTransactions.headMap(committed).clear();
                     _emitter.cleanupBefore(committed);
                 }
             }
-        } catch(FailedException e) {
+        } catch (FailedException e) {
             LOG.warn("Failed to emit batch for transaction", e);
             _collector.fail(input);
         }