You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:25 UTC

[47/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
index 2a77f3b..ce4c955 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
@@ -42,87 +42,83 @@ public class BatchSubtopologyBuilder {
     Map<String, Component> _bolts = new HashMap<String, Component>();
     Component _masterBolt;
     String _masterId;
-    
+
     public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
         Integer p = boltParallelism == null ? null : boltParallelism.intValue();
         _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
         _masterId = masterBoltId;
     }
-    
+
     public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
         this(masterBoltId, masterBolt, null);
     }
-    
+
     public BoltDeclarer getMasterDeclarer() {
         return new BoltDeclarerImpl(_masterBolt);
     }
-        
+
     public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
         return setBolt(id, bolt, null);
     }
-    
+
     public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
         return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
-    }     
-    
+    }
+
     public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
         return setBolt(id, bolt, null);
-    }    
-    
+    }
+
     public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
         return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
     }
-    
+
     private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
         Integer p = null;
-        if(parallelism!=null) p = parallelism.intValue();
+        if (parallelism != null)
+            p = parallelism.intValue();
         Component component = new Component(bolt, p);
         _bolts.put(id, component);
         return new BoltDeclarerImpl(component);
     }
-    
+
     public void extendTopology(TopologyBuilder builder) {
         BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
-        for(InputDeclaration decl: _masterBolt.declarations) {
+        for (InputDeclaration decl : _masterBolt.declarations) {
             decl.declare(declarer);
         }
-        for(Map conf: _masterBolt.componentConfs) {
+        for (Map conf : _masterBolt.componentConfs) {
             declarer.addConfigurations(conf);
         }
-        for(String id: _bolts.keySet()) {
+        for (String id : _bolts.keySet()) {
             Component component = _bolts.get(id);
             Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
-            for(String c: componentBoltSubscriptions(component)) {
+            for (String c : componentBoltSubscriptions(component)) {
                 SourceArgs source;
-                if(c.equals(_masterId)) {
+                if (c.equals(_masterId)) {
                     source = SourceArgs.single();
                 } else {
                     source = SourceArgs.all();
                 }
-                coordinatedArgs.put(c, source);                    
+                coordinatedArgs.put(c, source);
             }
-            
-
-            BoltDeclarer input = builder.setBolt(id,
-                                                  new CoordinatedBolt(component.bolt,
-                                                                      coordinatedArgs,
-                                                                      null),
-                                                  component.parallelism);
-            for(Map conf: component.componentConfs) {
+
+            BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, null), component.parallelism);
+            for (Map conf : component.componentConfs) {
                 input.addConfigurations(conf);
             }
-            for(String c: componentBoltSubscriptions(component)) {
+            for (String c : componentBoltSubscriptions(component)) {
                 input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
             }
-            for(InputDeclaration d: component.declarations) {
+            for (InputDeclaration d : component.declarations) {
                 d.declare(input);
             }
-        }        
+        }
     }
-        
+
     private Set<String> componentBoltSubscriptions(Component component) {
         Set<String> ret = new HashSet<String>();
-        for(InputDeclaration d: component.declarations) {
+        for (InputDeclaration d : component.declarations) {
             ret.add(d.getComponent());
         }
         return ret;
@@ -133,25 +129,26 @@ public class BatchSubtopologyBuilder {
         public Integer parallelism;
         public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
         public List<Map> componentConfs = new ArrayList<Map>();
-        
+
         public Component(IRichBolt bolt, Integer parallelism) {
             this.bolt = bolt;
             this.parallelism = parallelism;
         }
     }
-    
+
     private static interface InputDeclaration {
         void declare(InputDeclarer declarer);
+
         String getComponent();
     }
-        
+
     private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
         Component _component;
-        
+
         public BoltDeclarerImpl(Component component) {
             _component = component;
         }
-        
+
         @Override
         public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
             addDeclaration(new InputDeclaration() {
@@ -163,7 +160,7 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -174,12 +171,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.fieldsGrouping(component, streamId, fields);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -190,12 +187,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.globalGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -206,12 +203,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.globalGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -222,12 +219,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.shuffleGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -238,12 +235,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.shuffleGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -254,12 +251,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.localOrShuffleGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -270,8 +267,8 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.localOrShuffleGrouping(component, streamId);
-                }                
-                
+                }
+
                 @Override
                 public String getComponent() {
                     return component;
@@ -279,7 +276,7 @@ public class BatchSubtopologyBuilder {
             });
             return this;
         }
-        
+
         @Override
         public BoltDeclarer localFirstGrouping(final String componentId) {
             addDeclaration(new InputDeclaration() {
@@ -287,7 +284,7 @@ public class BatchSubtopologyBuilder {
                 public void declare(InputDeclarer declarer) {
                     declarer.localFirstGrouping(componentId);
                 }
-                
+
                 @Override
                 public String getComponent() {
                     return componentId;
@@ -295,7 +292,7 @@ public class BatchSubtopologyBuilder {
             });
             return this;
         }
-        
+
         @Override
         public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
             addDeclaration(new InputDeclaration() {
@@ -303,27 +300,27 @@ public class BatchSubtopologyBuilder {
                 public void declare(InputDeclarer declarer) {
                     declarer.localFirstGrouping(component, streamId);
                 }
-                
+
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
-        
+
         @Override
         public BoltDeclarer noneGrouping(final String component) {
             addDeclaration(new InputDeclaration() {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.noneGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -334,12 +331,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.noneGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -350,12 +347,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.allGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -366,12 +363,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.allGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -382,12 +379,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.directGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -398,12 +395,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.directGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -417,21 +414,21 @@ public class BatchSubtopologyBuilder {
         public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
             return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
         }
-        
+
         @Override
         public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
             addDeclaration(new InputDeclaration() {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.customGrouping(component, grouping);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
-            return this;        
+            return this;
         }
 
         @Override
@@ -440,12 +437,12 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.customGrouping(component, streamId, grouping);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -456,16 +453,16 @@ public class BatchSubtopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.grouping(stream, grouping);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return stream.get_componentId();
-                }                
+                }
             });
             return this;
         }
-        
+
         private void addDeclaration(InputDeclaration declaration) {
             _component.declarations.add(declaration);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
index 6f337a6..39a158d 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
@@ -45,8 +45,7 @@ import org.slf4j.LoggerFactory;
 import static backtype.storm.utils.Utils.get;
 
 /**
- * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
- * in the case of retries.
+ * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused in the case of retries.
  */
 public class CoordinatedBolt implements IRichBolt {
     public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
@@ -58,8 +57,7 @@ public class CoordinatedBolt implements IRichBolt {
     public static interface TimeoutCallback {
         void timeoutId(Object id);
     }
-    
-    
+
     public static class SourceArgs implements Serializable {
         public boolean singleCount;
 
@@ -74,7 +72,7 @@ public class CoordinatedBolt implements IRichBolt {
         public static SourceArgs all() {
             return new SourceArgs(false);
         }
-        
+
         @Override
         public String toString() {
             return "<Single: " + singleCount + ">";
@@ -101,14 +99,14 @@ public class CoordinatedBolt implements IRichBolt {
 
         public void ack(Tuple tuple) {
             Object id = tuple.getValue(0);
-            synchronized(_tracked) {
+            synchronized (_tracked) {
                 TrackingInfo track = _tracked.get(id);
                 if (track != null)
                     track.receivedTuples++;
             }
             boolean failed = checkFinishId(tuple, TupleType.REGULAR);
-            if(failed) {
-                _delegate.fail(tuple);                
+            if (failed) {
+                _delegate.fail(tuple);
             } else {
                 _delegate.ack(tuple);
             }
@@ -116,7 +114,7 @@ public class CoordinatedBolt implements IRichBolt {
 
         public void fail(Tuple tuple) {
             Object id = tuple.getValue(0);
-            synchronized(_tracked) {
+            synchronized (_tracked) {
                 TrackingInfo track = _tracked.get(id);
                 if (track != null)
                     track.failed = true;
@@ -124,18 +122,17 @@ public class CoordinatedBolt implements IRichBolt {
             checkFinishId(tuple, TupleType.REGULAR);
             _delegate.fail(tuple);
         }
-        
+
         public void reportError(Throwable error) {
             _delegate.reportError(error);
         }
 
-
         private void updateTaskCounts(Object id, List<Integer> tasks) {
-            synchronized(_tracked) {
+            synchronized (_tracked) {
                 TrackingInfo track = _tracked.get(id);
                 if (track != null) {
                     Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
-                    for(Integer task: tasks) {
+                    for (Integer task : tasks) {
                         int newCount = get(taskEmittedTuples, task, 0) + 1;
                         taskEmittedTuples.put(task, newCount);
                     }
@@ -161,34 +158,30 @@ public class CoordinatedBolt implements IRichBolt {
         boolean receivedId = false;
         boolean finished = false;
         List<Tuple> ackTuples = new ArrayList<Tuple>();
-        
+
         @Override
         public String toString() {
-            return "reportCount: " + reportCount + "\n" +
-                   "expectedTupleCount: " + expectedTupleCount + "\n" +
-                   "receivedTuples: " + receivedTuples + "\n" +
-                   "failed: " + failed + "\n" +
-                   taskEmittedTuples.toString();
+            return "reportCount: " + reportCount + "\n" + "expectedTupleCount: " + expectedTupleCount + "\n" + "receivedTuples: " + receivedTuples + "\n"
+                    + "failed: " + failed + "\n" + taskEmittedTuples.toString();
         }
     }
 
-    
     public static class IdStreamSpec implements Serializable {
         GlobalStreamId _id;
-        
+
         public GlobalStreamId getGlobalStreamId() {
             return _id;
         }
 
         public static IdStreamSpec makeDetectSpec(String component, String stream) {
             return new IdStreamSpec(component, stream);
-        }        
-        
+        }
+
         protected IdStreamSpec(String component, String stream) {
             _id = new GlobalStreamId(component, stream);
         }
     }
-    
+
     public CoordinatedBolt(IRichBolt delegate) {
         this(delegate, null, null);
     }
@@ -196,37 +189,35 @@ public class CoordinatedBolt implements IRichBolt {
     public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
         this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
     }
-    
+
     public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
         _sourceArgs = sourceArgs;
-        if(_sourceArgs==null) _sourceArgs = new HashMap<String, SourceArgs>();
+        if (_sourceArgs == null)
+            _sourceArgs = new HashMap<String, SourceArgs>();
         _delegate = delegate;
         _idStreamSpec = idStreamSpec;
     }
-    
+
     public void prepare(Map config, TopologyContext context, OutputCollector collector) {
         TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
-        if(_delegate instanceof TimeoutCallback) {
+        if (_delegate instanceof TimeoutCallback) {
             callback = new TimeoutItems();
         }
         _tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
         _collector = collector;
         _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
-        for(String component: Utils.get(context.getThisTargets(),
-                                        Constants.COORDINATED_STREAM_ID,
-                                        new HashMap<String, Grouping>())
-                                        .keySet()) {
-            for(Integer task: context.getComponentTasks(component)) {
+        for (String component : Utils.get(context.getThisTargets(), Constants.COORDINATED_STREAM_ID, new HashMap<String, Grouping>()).keySet()) {
+            for (Integer task : context.getComponentTasks(component)) {
                 _countOutTasks.add(task);
             }
         }
-        if(!_sourceArgs.isEmpty()) {
+        if (!_sourceArgs.isEmpty()) {
             _numSourceReports = 0;
-            for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
-                if(entry.getValue().singleCount) {
-                    _numSourceReports+=1;
+            for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) {
+                if (entry.getValue().singleCount) {
+                    _numSourceReports += 1;
                 } else {
-                    _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
+                    _numSourceReports += context.getComponentTasks(entry.getKey()).size();
                 }
             }
         }
@@ -235,57 +226,56 @@ public class CoordinatedBolt implements IRichBolt {
     private boolean checkFinishId(Tuple tup, TupleType type) {
         Object id = tup.getValue(0);
         boolean failed = false;
-        
-        synchronized(_tracked) {
+
+        synchronized (_tracked) {
             TrackingInfo track = _tracked.get(id);
             try {
-                if(track!=null) {
+                if (track != null) {
                     boolean delayed = false;
-                    if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
+                    if (_idStreamSpec == null && type == TupleType.COORD || _idStreamSpec != null && type == TupleType.ID) {
                         track.ackTuples.add(tup);
                         delayed = true;
                     }
-                    if(track.failed) {
+                    if (track.failed) {
                         failed = true;
-                        for(Tuple t: track.ackTuples) {
+                        for (Tuple t : track.ackTuples) {
                             _collector.fail(t);
                         }
                         _tracked.remove(id);
-                    } else if(track.receivedId
-                             && (_sourceArgs.isEmpty() ||
-                                  track.reportCount==_numSourceReports &&
-                                  track.expectedTupleCount == track.receivedTuples)){
-                        if(_delegate instanceof FinishedCallback) {
-                            ((FinishedCallback)_delegate).finishedId(id);
+                    } else if (track.receivedId
+                            && (_sourceArgs.isEmpty() || track.reportCount == _numSourceReports && track.expectedTupleCount == track.receivedTuples)) {
+                        if (_delegate instanceof FinishedCallback) {
+                            ((FinishedCallback) _delegate).finishedId(id);
                         }
-                        if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
+                        if (!(_sourceArgs.isEmpty() || type != TupleType.REGULAR)) {
                             throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
                         }
                         Iterator<Integer> outTasks = _countOutTasks.iterator();
-                        while(outTasks.hasNext()) {
+                        while (outTasks.hasNext()) {
                             int task = outTasks.next();
                             int numTuples = get(track.taskEmittedTuples, task, 0);
                             _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
                         }
-                        for(Tuple t: track.ackTuples) {
+                        for (Tuple t : track.ackTuples) {
                             _collector.ack(t);
                         }
                         track.finished = true;
                         _tracked.remove(id);
                     }
-                    if(!delayed && type!=TupleType.REGULAR) {
-                        if(track.failed) {
+                    if (!delayed && type != TupleType.REGULAR) {
+                        if (track.failed) {
                             _collector.fail(tup);
                         } else {
-                            _collector.ack(tup);                            
+                            _collector.ack(tup);
                         }
                     }
                 } else {
-                    if(type!=TupleType.REGULAR) _collector.fail(tup);
+                    if (type != TupleType.REGULAR)
+                        _collector.fail(tup);
                 }
-            } catch(FailedException e) {
+            } catch (FailedException e) {
                 LOG.error("Failed to finish batch", e);
-                for(Tuple t: track.ackTuples) {
+                for (Tuple t : track.ackTuples) {
                     _collector.fail(t);
                 }
                 _tracked.remove(id);
@@ -299,29 +289,30 @@ public class CoordinatedBolt implements IRichBolt {
         Object id = tuple.getValue(0);
         TrackingInfo track;
         TupleType type = getTupleType(tuple);
-        synchronized(_tracked) {
+        synchronized (_tracked) {
             track = _tracked.get(id);
-            if(track==null) {
+            if (track == null) {
                 track = new TrackingInfo();
-                if(_idStreamSpec==null) track.receivedId = true;
+                if (_idStreamSpec == null)
+                    track.receivedId = true;
                 _tracked.put(id, track);
             }
         }
-        
-        if(type==TupleType.ID) {
-            synchronized(_tracked) {
+
+        if (type == TupleType.ID) {
+            synchronized (_tracked) {
                 track.receivedId = true;
             }
-            checkFinishId(tuple, type);            
-        } else if(type==TupleType.COORD) {
+            checkFinishId(tuple, type);
+        } else if (type == TupleType.COORD) {
             int count = (Integer) tuple.getValue(1);
-            synchronized(_tracked) {
+            synchronized (_tracked) {
                 track.reportCount++;
-                track.expectedTupleCount+=count;
+                track.expectedTupleCount += count;
             }
             checkFinishId(tuple, type);
-        } else {            
-            synchronized(_tracked) {
+        } else {
+            synchronized (_tracked) {
                 _delegate.execute(tuple);
             }
         }
@@ -341,42 +332,38 @@ public class CoordinatedBolt implements IRichBolt {
     public Map<String, Object> getComponentConfiguration() {
         return _delegate.getComponentConfiguration();
     }
-    
+
     private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
         Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
         ret.put(sourceComponent, sourceArgs);
         return ret;
     }
-    
+
     private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
         @Override
         public void expire(Object id, TrackingInfo val) {
-            synchronized(_tracked) {
+            synchronized (_tracked) {
                 // the combination of the lock and the finished flag ensure that
                 // an id is never timed out if it has been finished
                 val.failed = true;
-                if(!val.finished) {
+                if (!val.finished) {
                     ((TimeoutCallback) _delegate).timeoutId(id);
                 }
             }
         }
     }
-    
+
     private TupleType getTupleType(Tuple tuple) {
-        if(_idStreamSpec!=null
-                && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
+        if (_idStreamSpec != null && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
             return TupleType.ID;
-        } else if(!_sourceArgs.isEmpty()
-                && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
+        } else if (!_sourceArgs.isEmpty() && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
             return TupleType.COORD;
         } else {
             return TupleType.REGULAR;
         }
     }
-    
+
     static enum TupleType {
-        REGULAR,
-        ID,
-        COORD
+        REGULAR, ID, COORD
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
index ee5d9bd..9a1abfa 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
@@ -25,6 +25,8 @@ import java.util.Map;
 
 public interface IBatchBolt<T> extends Serializable, IComponent {
     void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
+
     void execute(Tuple tuple);
+
     void finishBatch();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
index 624db3e..d10872f 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -17,23 +17,22 @@
  */
 package backtype.storm.drpc;
 
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
+import backtype.storm.generated.AuthorizationException;
 import backtype.storm.generated.DRPCRequest;
 import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.generated.AuthorizationException;
 import backtype.storm.security.auth.ThriftClient;
 import backtype.storm.security.auth.ThriftConnectionType;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
     public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
-    private final AtomicReference<DistributedRPCInvocations.Client> client =
-       new AtomicReference<DistributedRPCInvocations.Client>();
+    private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<DistributedRPCInvocations.Client>();
     private String host;
     private int port;
 
@@ -43,14 +42,14 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
         this.port = port;
         client.set(new DistributedRPCInvocations.Client(_protocol));
     }
-        
+
     public String getHost() {
         return host;
     }
-    
+
     public int getPort() {
         return port;
-    }       
+    }
 
     public void reconnectClient() throws TException {
         if (client.get() == null) {
@@ -70,9 +69,9 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
                 throw new TException("Client is not connected...");
             }
             c.result(id, result);
-        } catch(AuthorizationException aze) {
+        } catch (AuthorizationException aze) {
             throw aze;
-        } catch(TException e) {
+        } catch (TException e) {
             client.compareAndSet(c, null);
             throw e;
         }
@@ -85,24 +84,24 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
                 throw new TException("Client is not connected...");
             }
             return c.fetchRequest(func);
-        } catch(AuthorizationException aze) {
+        } catch (AuthorizationException aze) {
             throw aze;
-        } catch(TException e) {
+        } catch (TException e) {
             client.compareAndSet(c, null);
             throw e;
         }
-    }    
+    }
 
-    public void failRequest(String id) throws TException, AuthorizationException {
+    public void failRequest(String id) throws TException {
         DistributedRPCInvocations.Client c = client.get();
         try {
             if (c == null) {
                 throw new TException("Client is not connected...");
             }
             c.failRequest(id);
-        } catch(AuthorizationException aze) {
+        } catch (AuthorizationException aze) {
             throw aze;
-        } catch(TException e) {
+        } catch (TException e) {
             client.compareAndSet(c, null);
             throw e;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
index 4ed24d4..c490efd 100644
--- a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
@@ -17,25 +17,6 @@
  */
 package backtype.storm.drpc;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.thrift.TException;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.utils.NetWorkUtils;
-
 import backtype.storm.Config;
 import backtype.storm.ILocalDRPC;
 import backtype.storm.generated.AuthorizationException;
@@ -50,31 +31,38 @@ import backtype.storm.tuple.Values;
 import backtype.storm.utils.ExtendedThreadPoolExecutor;
 import backtype.storm.utils.ServiceRegistry;
 import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
 
 public class DRPCSpout extends BaseRichSpout {
-    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    // ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
     static final long serialVersionUID = 2387848310969237877L;
 
     public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-    
+
     SpoutOutputCollector _collector;
     List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
     transient LinkedList<Future<Void>> _futures = null;
     transient ExecutorService _backround = null;
     String _function;
     String _local_drpc_id = null;
-    
+
     private static class DRPCMessageId {
         String id;
         int index;
-        
+
         public DRPCMessageId(String id, int index) {
             this.id = id;
             this.index = index;
         }
     }
-    
-    
+
     public DRPCSpout(String function) {
         _function = function;
     }
@@ -83,7 +71,7 @@ public class DRPCSpout extends BaseRichSpout {
         _function = function;
         _local_drpc_id = drpc.getServiceId();
     }
-   
+
     private class Adder implements Callable<Void> {
         private String server;
         private int port;
@@ -129,16 +117,12 @@ public class DRPCSpout extends BaseRichSpout {
             }
         }
     }
-    
-    
- 
+
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
-        if(_local_drpc_id==null) {
-            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>());
+        if (_local_drpc_id == null) {
+            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
             _futures = new LinkedList<Future<Void>>();
 
             int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
@@ -146,26 +130,26 @@ public class DRPCSpout extends BaseRichSpout {
 
             int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
             List<String> servers = NetWorkUtils.host2Ip((List<String>) conf.get(Config.DRPC_SERVERS));
-            
-            if(servers == null || servers.isEmpty()) {
-                throw new RuntimeException("No DRPC servers configured for topology");   
+
+            if (servers == null || servers.isEmpty()) {
+                throw new RuntimeException("No DRPC servers configured for topology");
             }
-            
+
             if (numTasks < servers.size()) {
-                for (String s: servers) {
+                for (String s : servers) {
                     _futures.add(_backround.submit(new Adder(s, port, conf)));
                 }
-            } else {        
+            } else {
                 int i = index % servers.size();
                 _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
             }
         }
-        
+
     }
 
     @Override
     public void close() {
-        for(DRPCInvocationsClient client: _clients) {
+        for (DRPCInvocationsClient client : _clients) {
             client.close();
         }
     }
@@ -173,12 +157,12 @@ public class DRPCSpout extends BaseRichSpout {
     @Override
     public void nextTuple() {
         boolean gotRequest = false;
-        if(_local_drpc_id==null) {
+        if (_local_drpc_id == null) {
             int size = 0;
             synchronized (_clients) {
-                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+                size = _clients.size(); // This will only ever grow, so no need to worry about falling off the end
             }
-            for(int i=0; i<size; i++) {
+            for (int i = 0; i < size; i++) {
                 DRPCInvocationsClient client;
                 synchronized (_clients) {
                     client = _clients.get(i);
@@ -188,7 +172,7 @@ public class DRPCSpout extends BaseRichSpout {
                 }
                 try {
                     DRPCRequest req = client.fetchRequest(_function);
-                    if(req.get_request_id().length() > 0) {
+                    if (req.get_request_id().length() > 0) {
                         Map returnInfo = new HashMap();
                         returnInfo.put("id", req.get_request_id());
                         returnInfo.put("host", client.getHost());
@@ -210,10 +194,10 @@ public class DRPCSpout extends BaseRichSpout {
             checkFutures();
         } else {
             DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
-            if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
+            if (drpc != null) { // can happen during shutdown of drpc while topology is still up
                 try {
                     DRPCRequest req = drpc.fetchRequest(_function);
-                    if(req.get_request_id().length() > 0) {
+                    if (req.get_request_id().length() > 0) {
                         Map returnInfo = new HashMap();
                         returnInfo.put("id", req.get_request_id());
                         returnInfo.put("host", _local_drpc_id);
@@ -228,7 +212,7 @@ public class DRPCSpout extends BaseRichSpout {
                 }
             }
         }
-        if(!gotRequest) {
+        if (!gotRequest) {
             Utils.sleep(1);
         }
     }
@@ -241,8 +225,8 @@ public class DRPCSpout extends BaseRichSpout {
     public void fail(Object msgId) {
         DRPCMessageId did = (DRPCMessageId) msgId;
         DistributedRPCInvocations.Iface client;
-        
-        if(_local_drpc_id == null) {
+
+        if (_local_drpc_id == null) {
             client = _clients.get(did.index);
         } else {
             client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
@@ -259,5 +243,5 @@ public class DRPCSpout extends BaseRichSpout {
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("args", "return-info"));
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
index b74b97e..e9195e7 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class JoinResult extends BaseRichBolt {
     public static Logger LOG = LoggerFactory.getLogger(JoinResult.class);
 
@@ -43,27 +42,27 @@ public class JoinResult extends BaseRichBolt {
     public JoinResult(String returnComponent) {
         this.returnComponent = returnComponent;
     }
- 
+
     public void prepare(Map map, TopologyContext context, OutputCollector collector) {
         _collector = collector;
     }
 
     public void execute(Tuple tuple) {
         Object requestId = tuple.getValue(0);
-        if(tuple.getSourceComponent().equals(returnComponent)) {
+        if (tuple.getSourceComponent().equals(returnComponent)) {
             returns.put(requestId, tuple);
         } else {
             results.put(requestId, tuple);
         }
 
-        if(returns.containsKey(requestId) && results.containsKey(requestId)) {
+        if (returns.containsKey(requestId) && results.containsKey(requestId)) {
             Tuple result = results.remove(requestId);
             Tuple returner = returns.remove(requestId);
             LOG.debug(result.getValue(1).toString());
             List<Tuple> anchors = new ArrayList<Tuple>();
             anchors.add(result);
-            anchors.add(returner);            
-            _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1)));
+            anchors.add(returner);
+            _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1)));
             _collector.ack(result);
             _collector.ack(returner);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
index 113163d..2294c54 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
@@ -29,7 +29,6 @@ import backtype.storm.utils.KeyedRoundRobinQueue;
 import java.util.HashMap;
 import java.util.Map;
 
-
 public class KeyedFairBolt implements IRichBolt, FinishedCallback {
     IRichBolt _delegate;
     KeyedRoundRobinQueue<Tuple> _rrQueue;
@@ -39,14 +38,13 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
     public KeyedFairBolt(IRichBolt delegate) {
         _delegate = delegate;
     }
-    
+
     public KeyedFairBolt(IBasicBolt delegate) {
         this(new BasicBoltExecutor(delegate));
     }
-    
-    
+
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        if(_delegate instanceof FinishedCallback) {
+        if (_delegate instanceof FinishedCallback) {
             _callback = (FinishedCallback) _delegate;
         }
         _delegate.prepare(stormConf, context, collector);
@@ -54,7 +52,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
         _executor = new Thread(new Runnable() {
             public void run() {
                 try {
-                    while(true) {
+                    while (true) {
                         _delegate.execute(_rrQueue.take());
                     }
                 } catch (InterruptedException e) {
@@ -81,7 +79,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
     }
 
     public void finishedId(Object id) {
-        if(_callback!=null) {
+        if (_callback != null) {
             _callback.finishedId(id);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
index d03075e..ddcac35 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
@@ -23,30 +23,39 @@ import backtype.storm.tuple.Fields;
 
 public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
     public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
+
     public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
 
     public LinearDRPCInputDeclarer globalGrouping();
+
     public LinearDRPCInputDeclarer globalGrouping(String streamId);
 
     public LinearDRPCInputDeclarer shuffleGrouping();
+
     public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
 
     public LinearDRPCInputDeclarer localOrShuffleGrouping();
+
     public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
-    
+
     public LinearDRPCInputDeclarer noneGrouping();
+
     public LinearDRPCInputDeclarer noneGrouping(String streamId);
 
     public LinearDRPCInputDeclarer allGrouping();
+
     public LinearDRPCInputDeclarer allGrouping(String streamId);
 
     public LinearDRPCInputDeclarer directGrouping();
+
     public LinearDRPCInputDeclarer directGrouping(String streamId);
 
     public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+
     public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
 
     public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
+
     public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
index ebbbc6d..e8c202e 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
@@ -43,39 +43,38 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
 // Trident subsumes the functionality provided by this class, so it's deprecated
 @Deprecated
-public class LinearDRPCTopologyBuilder {    
+public class LinearDRPCTopologyBuilder {
     String _function;
     List<Component> _components = new ArrayList<Component>();
-    
-    
+
     public LinearDRPCTopologyBuilder(String function) {
         _function = function;
     }
-        
+
     public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
         return addBolt(new BatchBoltExecutor(bolt), parallelism);
     }
-    
+
     public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
         return addBolt(bolt, 1);
     }
-    
+
     @Deprecated
     public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
-        if(parallelism==null) parallelism = 1; 
+        if (parallelism == null)
+            parallelism = 1;
         Component component = new Component(bolt, parallelism.intValue());
         _components.add(component);
         return new InputDeclarerImpl(component);
     }
-    
+
     @Deprecated
     public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
         return addBolt(bolt, null);
     }
-    
+
     public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
         return addBolt(new BasicBoltExecutor(bolt), parallelism);
     }
@@ -83,125 +82,119 @@ public class LinearDRPCTopologyBuilder {
     public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
         return addBolt(bolt, null);
     }
-        
+
     public StormTopology createLocalTopology(ILocalDRPC drpc) {
         return createTopology(new DRPCSpout(_function, drpc));
     }
-    
+
     public StormTopology createRemoteTopology() {
         return createTopology(new DRPCSpout(_function));
     }
-    
-    
+
     private StormTopology createTopology(DRPCSpout spout) {
         final String SPOUT_ID = "spout";
         final String PREPARE_ID = "prepare-request";
-        
+
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout(SPOUT_ID, spout);
-        builder.setBolt(PREPARE_ID, new PrepareRequest())
-                .noneGrouping(SPOUT_ID);
-        int i=0;
-        for(; i<_components.size();i++) {
+        builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID);
+        int i = 0;
+        for (; i < _components.size(); i++) {
             Component component = _components.get(i);
-            
+
             Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
-            if (i==1) {
-                source.put(boltId(i-1), SourceArgs.single());
-            } else if (i>=2) {
-                source.put(boltId(i-1), SourceArgs.all());
+            if (i == 1) {
+                source.put(boltId(i - 1), SourceArgs.single());
+            } else if (i >= 2) {
+                source.put(boltId(i - 1), SourceArgs.all());
             }
             IdStreamSpec idSpec = null;
-            if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
+            if (i == _components.size() - 1 && component.bolt instanceof FinishedCallback) {
                 idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
             }
-            BoltDeclarer declarer = builder.setBolt(
-                    boltId(i),
-                    new CoordinatedBolt(component.bolt, source, idSpec),
-                    component.parallelism);
-            
-            for(Map conf: component.componentConfs) {
+            BoltDeclarer declarer = builder.setBolt(boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism);
+
+            for (Map conf : component.componentConfs) {
                 declarer.addConfigurations(conf);
             }
-            
-            if(idSpec!=null) {
+
+            if (idSpec != null) {
                 declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
             }
-            if(i==0 && component.declarations.isEmpty()) {
+            if (i == 0 && component.declarations.isEmpty()) {
                 declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
             } else {
                 String prevId;
-                if(i==0) {
+                if (i == 0) {
                     prevId = PREPARE_ID;
                 } else {
-                    prevId = boltId(i-1);
+                    prevId = boltId(i - 1);
                 }
-                for(InputDeclaration declaration: component.declarations) {
+                for (InputDeclaration declaration : component.declarations) {
                     declaration.declare(prevId, declarer);
                 }
             }
-            if(i>0) {
-                declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); 
+            if (i > 0) {
+                declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID);
             }
         }
-        
-        IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
+
+        IRichBolt lastBolt = _components.get(_components.size() - 1).bolt;
         OutputFieldsGetter getter = new OutputFieldsGetter();
         lastBolt.declareOutputFields(getter);
         Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
-        if(streams.size()!=1) {
+        if (streams.size() != 1) {
             throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
         }
         String outputStream = streams.keySet().iterator().next();
         List<String> fields = streams.get(outputStream).get_output_fields();
-        if(fields.size()!=2) {
-            throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
+        if (fields.size() != 2) {
+            throw new RuntimeException(
+                    "Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
         }
 
-        builder.setBolt("JoinResult", new JoinResult(PREPARE_ID))
-                .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
+        builder.setBolt("JoinResult", new JoinResult(PREPARE_ID)).fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0)))
                 .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
         i++;
-        builder.setBolt("ReturnResults", new ReturnResults())
-                .noneGrouping("JoinResult");
+        builder.setBolt("ReturnResults", new ReturnResults()).noneGrouping("JoinResult");
         return builder.createTopology();
     }
-    
+
     private static String boltId(int index) {
         return "bolt" + index;
     }
-    
+
     private static class Component {
         public IRichBolt bolt;
         public int parallelism;
         public List<Map> componentConfs;
         public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-        
+
         public Component(IRichBolt bolt, int parallelism) {
             this.bolt = bolt;
             this.parallelism = parallelism;
             this.componentConfs = new ArrayList();
         }
     }
-    
+
     private static interface InputDeclaration {
         public void declare(String prevComponent, InputDeclarer declarer);
     }
-    
+
     private class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer {
         Component _component;
-        
+
         public InputDeclarerImpl(Component component) {
             _component = component;
         }
-        
+
         @Override
         public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
             addDeclaration(new InputDeclaration() {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.fieldsGrouping(prevComponent, fields);
-                }                
+                }
             });
             return this;
         }
@@ -212,7 +205,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.fieldsGrouping(prevComponent, streamId, fields);
-                }                
+                }
             });
             return this;
         }
@@ -223,7 +216,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.globalGrouping(prevComponent);
-                }                
+                }
             });
             return this;
         }
@@ -234,7 +227,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.globalGrouping(prevComponent, streamId);
-                }                
+                }
             });
             return this;
         }
@@ -245,7 +238,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.shuffleGrouping(prevComponent);
-                }                
+                }
             });
             return this;
         }
@@ -256,7 +249,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.shuffleGrouping(prevComponent, streamId);
-                }                
+                }
             });
             return this;
         }
@@ -267,7 +260,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.localOrShuffleGrouping(prevComponent);
-                }                
+                }
             });
             return this;
         }
@@ -278,18 +271,18 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.localOrShuffleGrouping(prevComponent, streamId);
-                }                
+                }
             });
             return this;
         }
-        
+
         @Override
         public LinearDRPCInputDeclarer noneGrouping() {
             addDeclaration(new InputDeclaration() {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.noneGrouping(prevComponent);
-                }                
+                }
             });
             return this;
         }
@@ -300,7 +293,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.noneGrouping(prevComponent, streamId);
-                }                
+                }
             });
             return this;
         }
@@ -311,7 +304,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.allGrouping(prevComponent);
-                }                
+                }
             });
             return this;
         }
@@ -322,7 +315,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.allGrouping(prevComponent, streamId);
-                }                
+                }
             });
             return this;
         }
@@ -333,7 +326,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.directGrouping(prevComponent);
-                }                
+                }
             });
             return this;
         }
@@ -344,7 +337,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.directGrouping(prevComponent, streamId);
-                }                
+                }
             });
             return this;
         }
@@ -365,7 +358,7 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.customGrouping(prevComponent, grouping);
-                }                
+                }
             });
             return this;
         }
@@ -376,11 +369,11 @@ public class LinearDRPCTopologyBuilder {
                 @Override
                 public void declare(String prevComponent, InputDeclarer declarer) {
                     declarer.customGrouping(prevComponent, streamId, grouping);
-                }                
+                }
             });
             return this;
         }
-        
+
         private void addDeclaration(InputDeclaration declaration) {
             _component.declarations.add(declaration);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java b/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
index bd32169..fea8b36 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Random;
 import backtype.storm.utils.Utils;
 
-
 public class PrepareRequest extends BaseBasicBolt {
     public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
     public static final String RETURN_STREAM = "ret";

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java b/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
index 2ca517e..129e2b3 100644
--- a/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
@@ -37,15 +37,14 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.json.simple.JSONValue;
 
-
 public class ReturnResults extends BaseRichBolt {
-    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    // ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
     static final long serialVersionUID = -774882142710631591L;
 
     public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
     OutputCollector _collector;
     boolean local;
-    Map _conf; 
+    Map _conf;
     Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
 
     @Override
@@ -59,22 +58,24 @@ public class ReturnResults extends BaseRichBolt {
     public void execute(Tuple input) {
         String result = (String) input.getValue(0);
         String returnInfo = (String) input.getValue(1);
-        //LOG.info("Receive one message, resultInfo:{}, result:{}", returnInfo, result);
-        if(returnInfo!=null) {
+        // LOG.info("Receive one message, resultInfo:{}, result:{}", returnInfo, result);
+        if (returnInfo != null) {
             Map retMap = (Map) JSONValue.parse(returnInfo);
             final String host = (String) retMap.get("host");
             final int port = Utils.getInt(retMap.get("port"));
             String id = (String) retMap.get("id");
             DistributedRPCInvocations.Iface client;
-            if(local) {
+            if (local) {
                 client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
             } else {
-                List server = new ArrayList() {{
-                    add(host);
-                    add(port);
-                }};
-            
-                if(!_clients.containsKey(server)) {
+                List server = new ArrayList() {
+                    {
+                        add(host);
+                        add(port);
+                    }
+                };
+
+                if (!_clients.containsKey(server)) {
                     try {
                         _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
                     } catch (TTransportException ex) {
@@ -83,7 +84,7 @@ public class ReturnResults extends BaseRichBolt {
                 }
                 client = _clients.get(server);
             }
- 
+
             try {
                 client.result(id, result);
                 _collector.ack(input);
@@ -93,29 +94,29 @@ public class ReturnResults extends BaseRichBolt {
                 if (client instanceof DRPCInvocationsClient) {
                     try {
                         LOG.info("reconnecting... ");
-                        ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+                        ((DRPCInvocationsClient) client).reconnectClient(); // Blocking call
                     } catch (TException e2) {
                         throw new RuntimeException(e2);
                     }
                 }
-            } catch(TException e) {
+            } catch (TException e) {
                 LOG.error("Failed to return results to DRPC server", e);
                 _collector.fail(input);
                 if (client instanceof DRPCInvocationsClient) {
                     try {
                         LOG.info("reconnecting... ");
-                        ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+                        ((DRPCInvocationsClient) client).reconnectClient(); // Blocking call
                     } catch (TException e2) {
                         throw new RuntimeException(e2);
                     }
                 }
             }
         }
-    }    
+    }
 
     @Override
     public void cleanup() {
-        for(DRPCInvocationsClient c: _clients.values()) {
+        for (DRPCInvocationsClient c : _clients.values()) {
             c.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java b/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
index 06eadaf..533b112 100644
--- a/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
+++ b/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20")
 public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException");
 
@@ -264,11 +264,11 @@ public class AlreadyAliveException extends TException implements org.apache.thri
     return _Fields.findByThriftId(fieldId);
   }
 
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException {
     schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
   }
 
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException {
     schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
   }
 
@@ -288,10 +288,10 @@ public class AlreadyAliveException extends TException implements org.apache.thri
     return sb.toString();
   }
 
-  public void validate() throws org.apache.thrift.TException {
+  public void validate() throws TException {
     // check for required fields
     if (!is_set_msg()) {
-      throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+      throw new TProtocolException("Required field 'msg' is unset! Struct:" + toString());
     }
 
     // check for sub-struct validity
@@ -300,7 +300,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
   private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
     try {
       write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift.TException te) {
+    } catch (TException te) {
       throw new java.io.IOException(te);
     }
   }
@@ -308,7 +308,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift.TException te) {
+    } catch (TException te) {
       throw new java.io.IOException(te);
     }
   }
@@ -321,7 +321,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
 
   private static class AlreadyAliveExceptionStandardScheme extends StandardScheme<AlreadyAliveException> {
 
-    public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws TException {
       org.apache.thrift.protocol.TField schemeField;
       iprot.readStructBegin();
       while (true)
@@ -348,7 +348,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
       struct.validate();
     }
 
-    public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws TException {
       struct.validate();
 
       oprot.writeStructBegin(STRUCT_DESC);
@@ -372,13 +372,13 @@ public class AlreadyAliveException extends TException implements org.apache.thri
   private static class AlreadyAliveExceptionTupleScheme extends TupleScheme<AlreadyAliveException> {
 
     @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws TException {
       TTupleProtocol oprot = (TTupleProtocol) prot;
       oprot.writeString(struct.msg);
     }
 
     @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.msg = iprot.readString();
       struct.set_msg_isSet(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java b/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
index 02f72f0..0822f50 100644
--- a/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
+++ b/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20")
 public class AuthorizationException extends TException implements org.apache.thrift.TBase<AuthorizationException, AuthorizationException._Fields>, java.io.Serializable, Cloneable, Comparable<AuthorizationException> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AuthorizationException");
 
@@ -264,11 +264,11 @@ public class AuthorizationException extends TException implements org.apache.thr
     return _Fields.findByThriftId(fieldId);
   }
 
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException {
     schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
   }
 
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException {
     schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
   }
 
@@ -288,10 +288,10 @@ public class AuthorizationException extends TException implements org.apache.thr
     return sb.toString();
   }
 
-  public void validate() throws org.apache.thrift.TException {
+  public void validate() throws TException {
     // check for required fields
     if (!is_set_msg()) {
-      throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+      throw new TProtocolException("Required field 'msg' is unset! Struct:" + toString());
     }
 
     // check for sub-struct validity
@@ -300,7 +300,7 @@ public class AuthorizationException extends TException implements org.apache.thr
   private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
     try {
       write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift.TException te) {
+    } catch (TException te) {
       throw new java.io.IOException(te);
     }
   }
@@ -308,7 +308,7 @@ public class AuthorizationException extends TException implements org.apache.thr
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift.TException te) {
+    } catch (TException te) {
       throw new java.io.IOException(te);
     }
   }
@@ -321,7 +321,7 @@ public class AuthorizationException extends TException implements org.apache.thr
 
   private static class AuthorizationExceptionStandardScheme extends StandardScheme<AuthorizationException> {
 
-    public void read(org.apache.thrift.protocol.TProtocol iprot, AuthorizationException struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol iprot, AuthorizationException struct) throws TException {
       org.apache.thrift.protocol.TField schemeField;
       iprot.readStructBegin();
       while (true)
@@ -348,7 +348,7 @@ public class AuthorizationException extends TException implements org.apache.thr
       struct.validate();
     }
 
-    public void write(org.apache.thrift.protocol.TProtocol oprot, AuthorizationException struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol oprot, AuthorizationException struct) throws TException {
       struct.validate();
 
       oprot.writeStructBegin(STRUCT_DESC);
@@ -372,13 +372,13 @@ public class AuthorizationException extends TException implements org.apache.thr
   private static class AuthorizationExceptionTupleScheme extends TupleScheme<AuthorizationException> {
 
     @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws TException {
       TTupleProtocol oprot = (TTupleProtocol) prot;
       oprot.writeString(struct.msg);
     }
 
     @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.msg = iprot.readString();
       struct.set_msg_isSet(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java b/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
index e3d0a07..9241322 100644
--- a/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20")
 public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.io.Serializable, Cloneable, Comparable<Bolt> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Bolt");
 
@@ -337,11 +337,11 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
     return _Fields.findByThriftId(fieldId);
   }
 
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException {
     schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
   }
 
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException {
     schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
   }
 
@@ -369,14 +369,14 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
     return sb.toString();
   }
 
-  public void validate() throws org.apache.thrift.TException {
+  public void validate() throws TException {
     // check for required fields
     if (!is_set_bolt_object()) {
-      throw new org.apache.thrift.protocol.TProtocolException("Required field 'bolt_object' is unset! Struct:" + toString());
+      throw new TProtocolException("Required field 'bolt_object' is unset! Struct:" + toString());
     }
 
     if (!is_set_common()) {
-      throw new org.apache.thrift.protocol.TProtocolException("Required field 'common' is unset! Struct:" + toString());
+      throw new TProtocolException("Required field 'common' is unset! Struct:" + toString());
     }
 
     // check for sub-struct validity
@@ -388,7 +388,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
   private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
     try {
       write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift.TException te) {
+    } catch (TException te) {
       throw new java.io.IOException(te);
     }
   }
@@ -396,7 +396,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift.TException te) {
+    } catch (TException te) {
       throw new java.io.IOException(te);
     }
   }
@@ -409,7 +409,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
 
   private static class BoltStandardScheme extends StandardScheme<Bolt> {
 
-    public void read(org.apache.thrift.protocol.TProtocol iprot, Bolt struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol iprot, Bolt struct) throws TException {
       org.apache.thrift.protocol.TField schemeField;
       iprot.readStructBegin();
       while (true)
@@ -446,7 +446,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
       struct.validate();
     }
 
-    public void write(org.apache.thrift.protocol.TProtocol oprot, Bolt struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol oprot, Bolt struct) throws TException {
       struct.validate();
 
       oprot.writeStructBegin(STRUCT_DESC);
@@ -475,14 +475,14 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
   private static class BoltTupleScheme extends TupleScheme<Bolt> {
 
     @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws TException {
       TTupleProtocol oprot = (TTupleProtocol) prot;
       struct.bolt_object.write(oprot);
       struct.common.write(oprot);
     }
 
     @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.bolt_object = new ComponentObject();
       struct.bolt_object.read(iprot);