You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:06 UTC

[28/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
index f7ce534..3768cb1 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
@@ -35,40 +35,38 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TransactionalSpoutCoordinator extends BaseRichSpout { 
+public class TransactionalSpoutCoordinator extends BaseRichSpout {
     public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class);
-    
+
     public static final BigInteger INIT_TXID = BigInteger.ONE;
-    
-    
+
     public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch";
     public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/commit";
 
     private static final String CURRENT_TX = "currtx";
     private static final String META_DIR = "meta";
-    
+
     private ITransactionalSpout _spout;
     private ITransactionalSpout.Coordinator _coordinator;
     private TransactionalState _state;
     private RotatingTransactionalState _coordinatorState;
-    
+
     TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
-    
+
     private SpoutOutputCollector _collector;
     private Random _rand;
     BigInteger _currTransaction;
     int _maxTransactionActive;
     StateInitializer _initializer;
-    
-    
+
     public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
         _spout = spout;
     }
-    
+
     public ITransactionalSpout getSpout() {
         return _spout;
     }
-    
+
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _rand = new Random(Utils.secureRandomLong());
@@ -78,7 +76,7 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
         _coordinator = _spout.getCoordinator(conf, context);
         _currTransaction = getStoredCurrTransaction(_state);
         Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-        if(active==null) {
+        if (active == null) {
             _maxTransactionActive = 1;
         } else {
             _maxTransactionActive = Utils.getInt(active);
@@ -100,10 +98,10 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
     public void ack(Object msgId) {
         TransactionAttempt tx = (TransactionAttempt) msgId;
         TransactionStatus status = _activeTx.get(tx.getTransactionId());
-        if(status!=null && tx.equals(status.attempt)) {
-            if(status.status==AttemptStatus.PROCESSING) {
+        if (status != null && tx.equals(status.attempt)) {
+            if (status.status == AttemptStatus.PROCESSING) {
                 status.status = AttemptStatus.PROCESSED;
-            } else if(status.status==AttemptStatus.COMMITTING) {
+            } else if (status.status == AttemptStatus.COMMITTING) {
                 _activeTx.remove(tx.getTransactionId());
                 _coordinatorState.cleanupBefore(tx.getTransactionId());
                 _currTransaction = nextTransactionId(tx.getTransactionId());
@@ -117,12 +115,12 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
     public void fail(Object msgId) {
         TransactionAttempt tx = (TransactionAttempt) msgId;
         TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
-        if(stored!=null && tx.equals(stored.attempt)) {
+        if (stored != null && tx.equals(stored.attempt)) {
             _activeTx.tailMap(tx.getTransactionId()).clear();
             sync();
         }
     }
-    
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
@@ -130,24 +128,23 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
         declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid"));
         declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
     }
-    
+
     private void sync() {
         // note that sometimes the tuples active may be less than max_spout_pending, e.g.
         // max_spout_pending = 3
         // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
         // and there won't be a batch for tx 4 because there's max_spout_pending tx active
         TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
-        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
+        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
             maybeCommit.status = AttemptStatus.COMMITTING;
             _collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
         }
-        
+
         try {
-            if(_activeTx.size() < _maxTransactionActive) {
+            if (_activeTx.size() < _maxTransactionActive) {
                 BigInteger curr = _currTransaction;
-                for(int i=0; i<_maxTransactionActive; i++) {
-                    if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
-                            && !_activeTx.containsKey(curr)) {
+                for (int i = 0; i < _maxTransactionActive; i++) {
+                    if ((_coordinatorState.hasCache(curr) || _coordinator.isReady()) && !_activeTx.containsKey(curr)) {
                         TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
                         Object state = _coordinatorState.getState(curr, _initializer);
                         _activeTx.put(curr, new TransactionStatus(attempt));
@@ -155,8 +152,8 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
                     }
                     curr = nextTransactionId(curr);
                 }
-            }     
-        } catch(FailedException e) {
+            }
+        } catch (FailedException e) {
             LOG.warn("Failed to get metadata for a transaction", e);
         }
     }
@@ -167,17 +164,15 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
         ret.setMaxTaskParallelism(1);
         return ret;
     }
-    
+
     private static enum AttemptStatus {
-        PROCESSING,
-        PROCESSED,
-        COMMITTING
+        PROCESSING, PROCESSED, COMMITTING
     }
-    
+
     private static class TransactionStatus {
         TransactionAttempt attempt;
         AttemptStatus status;
-        
+
         public TransactionStatus(TransactionAttempt attempt) {
             this.attempt = attempt;
             this.status = AttemptStatus.PROCESSING;
@@ -186,28 +181,29 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
         @Override
         public String toString() {
             return attempt.toString() + " <" + status.toString() + ">";
-        }        
+        }
     }
-    
-    
+
     private BigInteger nextTransactionId(BigInteger id) {
         return id.add(BigInteger.ONE);
     }
-    
+
     private BigInteger previousTransactionId(BigInteger id) {
-        if(id.equals(INIT_TXID)) {
+        if (id.equals(INIT_TXID)) {
             return null;
         } else {
             return id.subtract(BigInteger.ONE);
         }
-    }    
-    
+    }
+
     private BigInteger getStoredCurrTransaction(TransactionalState state) {
         BigInteger ret = (BigInteger) state.getData(CURRENT_TX);
-        if(ret==null) return INIT_TXID;
-        else return ret;
+        if (ret == null)
+            return INIT_TXID;
+        else
+            return ret;
     }
-    
+
     private class StateInitializer implements RotatingTransactionalState.StateInitializer {
         @Override
         public Object init(BigInteger txid, Object lastState) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
index 98d1163..e775eb5 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
@@ -50,8 +50,7 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * Trident subsumes the functionality provided by transactional topologies, so this 
- * class is deprecated.
+ * Trident subsumes the functionality provided by transactional topologies, so this class is deprecated.
  * 
  */
 @Deprecated
@@ -62,16 +61,16 @@ public class TransactionalTopologyBuilder {
     Map<String, Component> _bolts = new HashMap<String, Component>();
     Integer _spoutParallelism;
     List<Map> _spoutConfs = new ArrayList();
-    
+
     // id is used to store the state of this transactionalspout in zookeeper
-    // it would be very dangerous to have 2 topologies active with the same id in the same cluster    
+    // it would be very dangerous to have 2 topologies active with the same id in the same cluster
     public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout, Number spoutParallelism) {
         _id = id;
         _spoutId = spoutId;
         _spout = spout;
         _spoutParallelism = (spoutParallelism == null) ? null : spoutParallelism.intValue();
     }
-    
+
     public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout) {
         this(id, spoutId, spout, null);
     }
@@ -79,27 +78,27 @@ public class TransactionalTopologyBuilder {
     public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout, Number spoutParallelism) {
         this(id, spoutId, new PartitionedTransactionalSpoutExecutor(spout), spoutParallelism);
     }
-    
+
     public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout) {
         this(id, spoutId, spout, null);
     }
-    
+
     public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout, Number spoutParallelism) {
         this(id, spoutId, new OpaquePartitionedTransactionalSpoutExecutor(spout), spoutParallelism);
     }
-    
+
     public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout) {
         this(id, spoutId, spout, null);
     }
-    
+
     public SpoutDeclarer getSpoutDeclarer() {
         return new SpoutDeclarerImpl();
     }
-    
+
     public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
         return setBolt(id, bolt, null);
     }
-    
+
     public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
         return setBolt(id, new BatchBoltExecutor(bolt), parallelism, bolt instanceof ICommitter);
     }
@@ -107,86 +106,79 @@ public class TransactionalTopologyBuilder {
     public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) {
         return setCommitterBolt(id, bolt, null);
     }
-    
+
     public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt, Number parallelism) {
         return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true);
-    }      
-    
+    }
+
     public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
         return setBolt(id, bolt, null);
-    }    
-    
+    }
+
     public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
         return setBolt(id, new BasicBoltExecutor(bolt), parallelism, false);
     }
-    
+
     private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism, boolean committer) {
         Integer p = null;
-        if(parallelism!=null) p = parallelism.intValue();
+        if (parallelism != null)
+            p = parallelism.intValue();
         Component component = new Component(bolt, p, committer);
         _bolts.put(id, component);
         return new BoltDeclarerImpl(component);
     }
-    
+
     public TopologyBuilder buildTopologyBuilder() {
         String coordinator = _spoutId + "/coordinator";
         TopologyBuilder builder = new TopologyBuilder();
         SpoutDeclarer declarer = builder.setSpout(coordinator, new TransactionalSpoutCoordinator(_spout));
-        for(Map conf: _spoutConfs) {
+        for (Map conf : _spoutConfs) {
             declarer.addConfigurations(conf);
         }
         declarer.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
 
-        BoltDeclarer emitterDeclarer = 
-                builder.setBolt(_spoutId,
-                        new CoordinatedBolt(new TransactionalSpoutBatchExecutor(_spout),
-                                             null,
-                                             null),
-                        _spoutParallelism)
-                .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
-                .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
-        if(_spout instanceof ICommitterTransactionalSpout) {
+        BoltDeclarer emitterDeclarer =
+                builder.setBolt(_spoutId, new CoordinatedBolt(new TransactionalSpoutBatchExecutor(_spout), null, null), _spoutParallelism)
+                        .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
+                        .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
+        if (_spout instanceof ICommitterTransactionalSpout) {
             emitterDeclarer.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
         }
-        for(String id: _bolts.keySet()) {
+        for (String id : _bolts.keySet()) {
             Component component = _bolts.get(id);
             Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
-            for(String c: componentBoltSubscriptions(component)) {
+            for (String c : componentBoltSubscriptions(component)) {
                 coordinatedArgs.put(c, SourceArgs.all());
             }
-            
+
             IdStreamSpec idSpec = null;
-            if(component.committer) {
-                idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);          
+            if (component.committer) {
+                idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
             }
-            BoltDeclarer input = builder.setBolt(id,
-                                                  new CoordinatedBolt(component.bolt,
-                                                                      coordinatedArgs,
-                                                                      idSpec),
-                                                  component.parallelism);
-            for(Map conf: component.componentConfs) {
+            BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, idSpec), component.parallelism);
+            for (Map conf : component.componentConfs) {
                 input.addConfigurations(conf);
             }
-            for(String c: componentBoltSubscriptions(component)) {
+            for (String c : componentBoltSubscriptions(component)) {
                 input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
             }
-            for(InputDeclaration d: component.declarations) {
+            for (InputDeclaration d : component.declarations) {
                 d.declare(input);
             }
-            if(component.committer) {
-                input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);                
+            if (component.committer) {
+                input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
             }
         }
         return builder;
     }
-    
+
     public StormTopology buildTopology() {
         return buildTopologyBuilder().createTopology();
     }
-    
+
     private Set<String> componentBoltSubscriptions(Component component) {
         Set<String> ret = new HashSet<String>();
-        for(InputDeclaration d: component.declarations) {
+        for (InputDeclaration d : component.declarations) {
             ret.add(d.getComponent());
         }
         return ret;
@@ -198,34 +190,35 @@ public class TransactionalTopologyBuilder {
         public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
         public List<Map> componentConfs = new ArrayList<Map>();
         public boolean committer;
-        
+
         public Component(IRichBolt bolt, Integer parallelism, boolean committer) {
             this.bolt = bolt;
             this.parallelism = parallelism;
             this.committer = committer;
         }
     }
-    
+
     private static interface InputDeclaration {
         void declare(InputDeclarer declarer);
+
         String getComponent();
     }
-    
+
     private class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
         @Override
         public SpoutDeclarer addConfigurations(Map conf) {
             _spoutConfs.add(conf);
             return this;
-        }        
+        }
     }
-    
+
     private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
         Component _component;
-        
+
         public BoltDeclarerImpl(Component component) {
             _component = component;
         }
-        
+
         @Override
         public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
             addDeclaration(new InputDeclaration() {
@@ -237,7 +230,7 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -248,12 +241,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.fieldsGrouping(component, streamId, fields);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -264,12 +257,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.globalGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -280,12 +273,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.globalGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -296,12 +289,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.shuffleGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -312,12 +305,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.shuffleGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -328,12 +321,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.localOrShuffleGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -345,7 +338,7 @@ public class TransactionalTopologyBuilder {
                 public void declare(InputDeclarer declarer) {
                     declarer.localOrShuffleGrouping(component, streamId);
                 }
-                
+
                 @Override
                 public String getComponent() {
                     return component;
@@ -353,7 +346,7 @@ public class TransactionalTopologyBuilder {
             });
             return this;
         }
-        
+
         @Override
         public BoltDeclarer localFirstGrouping(final String component) {
             addDeclaration(new InputDeclaration() {
@@ -361,7 +354,7 @@ public class TransactionalTopologyBuilder {
                 public void declare(InputDeclarer declarer) {
                     declarer.localFirstGrouping(component);
                 }
-                
+
                 @Override
                 public String getComponent() {
                     return component;
@@ -369,7 +362,7 @@ public class TransactionalTopologyBuilder {
             });
             return this;
         }
-        
+
         @Override
         public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
             addDeclaration(new InputDeclaration() {
@@ -377,7 +370,7 @@ public class TransactionalTopologyBuilder {
                 public void declare(InputDeclarer declarer) {
                     declarer.localFirstGrouping(component, streamId);
                 }
-                
+
                 @Override
                 public String getComponent() {
                     return component;
@@ -385,19 +378,19 @@ public class TransactionalTopologyBuilder {
             });
             return this;
         }
-        
+
         @Override
         public BoltDeclarer noneGrouping(final String component) {
             addDeclaration(new InputDeclaration() {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.noneGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -408,12 +401,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.noneGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -424,12 +417,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.allGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -440,12 +433,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.allGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -456,12 +449,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.directGrouping(component);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -472,12 +465,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.directGrouping(component, streamId);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -498,14 +491,14 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.customGrouping(component, grouping);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
-            return this;        
+            return this;
         }
 
         @Override
@@ -514,12 +507,12 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.customGrouping(component, streamId, grouping);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return component;
-                }                
+                }
             });
             return this;
         }
@@ -530,16 +523,16 @@ public class TransactionalTopologyBuilder {
                 @Override
                 public void declare(InputDeclarer declarer) {
                     declarer.grouping(stream, grouping);
-                }                
+                }
 
                 @Override
                 public String getComponent() {
                     return stream.get_componentId();
-                }                
+                }
             });
             return this;
         }
-        
+
         private void addDeclaration(InputDeclaration declaration) {
             _component.declarations.add(declaration);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
index 8d1f60b..35fb1c6 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
@@ -24,33 +24,34 @@ import backtype.storm.transactional.TransactionAttempt;
 import java.util.Map;
 
 /**
- * This defines a transactional spout which does *not* necessarily
- * replay the same batch every time it emits a batch for a transaction id.
+ * This defines a transactional spout which does *not* necessarily replay the same batch every time it emits a batch for a transaction id.
  */
 public interface IOpaquePartitionedTransactionalSpout<T> extends IComponent {
     public interface Coordinator {
         /**
          * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
          * 
-         * You should sleep here if you want a delay between asking for the next transaction (this will be called 
-         * repeatedly in a loop).
+         * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
          */
         boolean isReady();
+
         void close();
     }
-    
+
     public interface Emitter<X> {
         /**
-         * Emit a batch of tuples for a partition/transaction. 
+         * Emit a batch of tuples for a partition/transaction.
          * 
-         * Return the metadata describing this batch that will be used as lastPartitionMeta
-         * for defining the parameters of the next batch.
+         * Return the metadata describing this batch that will be used as lastPartitionMeta for defining the parameters of the next batch.
          */
         X emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
+
         int numPartitions();
+
         void close();
     }
-    
-    Emitter<T> getEmitter(Map conf, TopologyContext context);     
-    Coordinator getCoordinator(Map conf, TopologyContext context);     
+
+    Emitter<T> getEmitter(Map conf, TopologyContext context);
+
+    Coordinator getCoordinator(Map conf, TopologyContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
index e428328..7b1e4fb 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
@@ -24,46 +24,43 @@ import backtype.storm.coordination.BatchOutputCollector;
 import java.util.Map;
 
 /**
- * This interface defines a transactional spout that reads its tuples from a partitioned set of 
- * brokers. It automates the storing of metadata for each partition to ensure that the same batch
- * is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
+ * This interface defines a transactional spout that reads its tuples from a partitioned set of brokers. It automates the storing of metadata for each partition
+ * to ensure that the same batch is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
  */
 public interface IPartitionedTransactionalSpout<T> extends IComponent {
     public interface Coordinator {
         /**
-         * Return the number of partitions currently in the source of data. The idea is
-         * is that if a new partition is added and a prior transaction is replayed, it doesn't
-         * emit tuples for the new partition because it knows how many partitions were in 
-         * that transaction.
+         * Return the number of partitions currently in the source of data. The idea is is that if a new partition is added and a prior transaction is replayed,
+         * it doesn't emit tuples for the new partition because it knows how many partitions were in that transaction.
          */
         int numPartitions();
-        
+
         /**
          * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
          * 
-         * You should sleep here if you want a delay between asking for the next transaction (this will be called 
-         * repeatedly in a loop).
+         * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
          */
         boolean isReady();
-                
+
         void close();
     }
-    
+
     public interface Emitter<X> {
         /**
-         * Emit a batch of tuples for a partition/transaction that's never been emitted before.
-         * Return the metadata that can be used to reconstruct this partition/batch in the future.
+         * Emit a batch of tuples for a partition/transaction that's never been emitted before. Return the metadata that can be used to reconstruct this
+         * partition/batch in the future.
          */
         X emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
 
         /**
-         * Emit a batch of tuples for a partition/transaction that has been emitted before, using
-         * the metadata created when it was first emitted.
+         * Emit a batch of tuples for a partition/transaction that has been emitted before, using the metadata created when it was first emitted.
          */
         void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X partitionMeta);
+
         void close();
     }
-    
+
     Coordinator getCoordinator(Map conf, TopologyContext context);
-    Emitter<T> getEmitter(Map conf, TopologyContext context);      
+
+    Emitter<T> getEmitter(Map conf, TopologyContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
index aabcb7a..4f894d9 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
@@ -33,17 +33,16 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
-
 public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTransactionalSpout<Object> {
     IOpaquePartitionedTransactionalSpout _spout;
-    
+
     public class Coordinator implements ITransactionalSpout.Coordinator<Object> {
         IOpaquePartitionedTransactionalSpout.Coordinator _coordinator;
 
         public Coordinator(Map conf, TopologyContext context) {
             _coordinator = _spout.getCoordinator(conf, context);
         }
-        
+
         @Override
         public Object initializeTransaction(BigInteger txid, Object prevMetadata) {
             return null;
@@ -52,14 +51,14 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
         @Override
         public boolean isReady() {
             return _coordinator.isReady();
-        }        
+        }
 
         @Override
         public void close() {
             _coordinator.close();
-        }        
+        }
     }
-    
+
     public class Emitter implements ICommitterTransactionalSpout.Emitter {
         IOpaquePartitionedTransactionalSpout.Emitter _emitter;
         TransactionalState _state;
@@ -67,21 +66,21 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
         Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
         int _index;
         int _numTasks;
-        
+
         public Emitter(Map conf, TopologyContext context) {
             _emitter = _spout.getEmitter(conf, context);
             _index = context.getThisTaskIndex();
             _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
-            _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); 
+            _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
             List<String> existingPartitions = _state.list("");
-            for(String p: existingPartitions) {
+            for (String p : existingPartitions) {
                 int partition = Integer.parseInt(p);
-                if((partition - _index) % _numTasks == 0) {
+                if ((partition - _index) % _numTasks == 0) {
                     _partitionStates.put(partition, new RotatingTransactionalState(_state, p));
                 }
             }
         }
-        
+
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) {
             Map<Integer, Object> metas = new HashMap<Integer, Object>();
@@ -89,21 +88,22 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
             int partitions = _emitter.numPartitions();
             Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
             Map<Integer, Object> prevCached;
-            if(entry!=null) {
+            if (entry != null) {
                 prevCached = entry.getValue();
             } else {
                 prevCached = new HashMap<Integer, Object>();
             }
-            
-            for(int i=_index; i < partitions; i+=_numTasks) {
+
+            for (int i = _index; i < partitions; i += _numTasks) {
                 RotatingTransactionalState state = _partitionStates.get(i);
-                if(state==null) {
+                if (state == null) {
                     state = new RotatingTransactionalState(_state, "" + i);
                     _partitionStates.put(i, state);
                 }
                 state.removeState(tx.getTransactionId());
                 Object lastMeta = prevCached.get(i);
-                if(lastMeta==null) lastMeta = state.getLastState();
+                if (lastMeta == null)
+                    lastMeta = state.getLastState();
                 Object meta = _emitter.emitPartitionBatch(tx, collector, i, lastMeta);
                 metas.put(i, meta);
             }
@@ -111,16 +111,16 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
 
         @Override
         public void cleanupBefore(BigInteger txid) {
-            for(RotatingTransactionalState state: _partitionStates.values()) {
+            for (RotatingTransactionalState state : _partitionStates.values()) {
                 state.cleanupBefore(txid);
-            }            
+            }
         }
 
         @Override
         public void commit(TransactionAttempt attempt) {
             BigInteger txid = attempt.getTransactionId();
             Map<Integer, Object> metas = _cachedMetas.remove(txid);
-            for(Integer partition: metas.keySet()) {
+            for (Integer partition : metas.keySet()) {
                 Object meta = metas.get(partition);
                 _partitionStates.get(partition).overrideState(txid, meta);
             }
@@ -130,12 +130,12 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
         public void close() {
             _emitter.close();
         }
-    } 
-    
+    }
+
     public OpaquePartitionedTransactionalSpoutExecutor(IOpaquePartitionedTransactionalSpout spout) {
         _spout = spout;
     }
-    
+
     @Override
     public ITransactionalSpout.Coordinator<Object> getCoordinator(Map conf, TopologyContext context) {
         return new Coordinator(conf, context);
@@ -155,5 +155,5 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
     public Map<String, Object> getComponentConfiguration() {
         return _spout.getComponentConfiguration();
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
index 479dda4..8422576 100644
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
@@ -29,30 +29,29 @@ import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 
-
 public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpout<Integer> {
     IPartitionedTransactionalSpout _spout;
-    
+
     public PartitionedTransactionalSpoutExecutor(IPartitionedTransactionalSpout spout) {
         _spout = spout;
     }
-    
+
     public IPartitionedTransactionalSpout getPartitionedSpout() {
         return _spout;
     }
-    
+
     class Coordinator implements ITransactionalSpout.Coordinator<Integer> {
         private IPartitionedTransactionalSpout.Coordinator _coordinator;
-        
+
         public Coordinator(Map conf, TopologyContext context) {
             _coordinator = _spout.getCoordinator(conf, context);
         }
-        
+
         @Override
         public Integer initializeTransaction(BigInteger txid, Integer prevMetadata) {
             return _coordinator.numPartitions();
         }
-        
+
         @Override
         public boolean isReady() {
             return _coordinator.isReady();
@@ -61,53 +60,51 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
         @Override
         public void close() {
             _coordinator.close();
-        }        
+        }
     }
-    
+
     class Emitter implements ITransactionalSpout.Emitter<Integer> {
         private IPartitionedTransactionalSpout.Emitter _emitter;
         private TransactionalState _state;
         private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
         private int _index;
         private int _numTasks;
-        
+
         public Emitter(Map conf, TopologyContext context) {
             _emitter = _spout.getEmitter(conf, context);
-            _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration()); 
+            _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
             _index = context.getThisTaskIndex();
             _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
         }
 
         @Override
-        public void emitBatch(final TransactionAttempt tx, final Integer partitions,
-                final BatchOutputCollector collector) {
-            for(int i=_index; i < partitions; i+=_numTasks) {
-                if(!_partitionStates.containsKey(i)) {
+        public void emitBatch(final TransactionAttempt tx, final Integer partitions, final BatchOutputCollector collector) {
+            for (int i = _index; i < partitions; i += _numTasks) {
+                if (!_partitionStates.containsKey(i)) {
                     _partitionStates.put(i, new RotatingTransactionalState(_state, "" + i));
                 }
                 RotatingTransactionalState state = _partitionStates.get(i);
                 final int partition = i;
-                Object meta = state.getStateOrCreate(tx.getTransactionId(),
-                        new RotatingTransactionalState.StateInitializer() {
+                Object meta = state.getStateOrCreate(tx.getTransactionId(), new RotatingTransactionalState.StateInitializer() {
                     @Override
                     public Object init(BigInteger txid, Object lastState) {
                         return _emitter.emitPartitionBatchNew(tx, collector, partition, lastState);
                     }
                 });
                 // it's null if one of:
-                //   a) a later transaction batch was emitted before this, so we should skip this batch
-                //   b) if didn't exist and was created (in which case the StateInitializer was invoked and 
-                //      it was emitted
-                if(meta!=null) {
+                // a) a later transaction batch was emitted before this, so we should skip this batch
+                // b) if didn't exist and was created (in which case the StateInitializer was invoked and
+                // it was emitted
+                if (meta != null) {
                     _emitter.emitPartitionBatch(tx, collector, partition, meta);
                 }
             }
-            
+
         }
 
         @Override
         public void cleanupBefore(BigInteger txid) {
-            for(RotatingTransactionalState state: _partitionStates.values()) {
+            for (RotatingTransactionalState state : _partitionStates.values()) {
                 state.cleanupBefore(txid);
             }
         }
@@ -117,7 +114,7 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
             _state.close();
             _emitter.close();
         }
-    }    
+    }
 
     @Override
     public ITransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
@@ -138,5 +135,5 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
     public Map<String, Object> getComponentConfiguration() {
         return _spout.getComponentConfiguration();
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
index 20c5cd3..63aced9 100644
--- a/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
@@ -27,19 +27,19 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 /**
- * A map from txid to a value. Automatically deletes txids that have been committed. 
+ * A map from txid to a value. Automatically deletes txids that have been committed.
  */
 public class RotatingTransactionalState {
     public static interface StateInitializer {
         Object init(BigInteger txid, Object lastState);
-    }    
+    }
 
     private TransactionalState _state;
     private String _subdir;
     private boolean _strictOrder;
-    
+
     private TreeMap<BigInteger, Object> _curr = new TreeMap<BigInteger, Object>();
-    
+
     public RotatingTransactionalState(TransactionalState state, String subdir, boolean strictOrder) {
         _state = state;
         _subdir = subdir;
@@ -51,32 +51,35 @@ public class RotatingTransactionalState {
     public RotatingTransactionalState(TransactionalState state, String subdir) {
         this(state, subdir, false);
     }
-    
+
     public Object getLastState() {
-        if(_curr.isEmpty()) return null;
-        else return _curr.lastEntry().getValue();
+        if (_curr.isEmpty())
+            return null;
+        else
+            return _curr.lastEntry().getValue();
     }
-    
+
     public void overrideState(BigInteger txid, Object state) {
         _state.setData(txPath(txid), state);
         _curr.put(txid, state);
     }
 
     public void removeState(BigInteger txid) {
-        if(_curr.containsKey(txid)) {
+        if (_curr.containsKey(txid)) {
             _curr.remove(txid);
             _state.delete(txPath(txid));
         }
     }
-    
+
     public Object getState(BigInteger txid, StateInitializer init) {
-        if(!_curr.containsKey(txid)) {
+        if (!_curr.containsKey(txid)) {
             SortedMap<BigInteger, Object> prevMap = _curr.headMap(txid);
-            SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);            
-            
+            SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);
+
             BigInteger prev = null;
-            if(!prevMap.isEmpty()) prev = prevMap.lastKey();
-            
+            if (!prevMap.isEmpty())
+                prev = prevMap.lastKey();
+
             if (_strictOrder) {
                 if (prev == null && !txid.equals(TransactionalSpoutCoordinator.INIT_TXID)) {
                     throw new IllegalStateException("Trying to initialize transaction for which there should be a previous state");
@@ -88,7 +91,7 @@ public class RotatingTransactionalState {
                     throw new IllegalStateException("Expecting tx state to be initialized in strict order but there are txids after that have state");
                 }
             }
-            
+
             Object data;
             if (afterMap.isEmpty()) {
                 Object prevData;
@@ -106,11 +109,11 @@ public class RotatingTransactionalState {
         }
         return _curr.get(txid);
     }
-    
+
     public boolean hasCache(BigInteger txid) {
         return _curr.containsKey(txid);
     }
-    
+
     /**
      * Returns null if it was created, the value otherwise.
      */
@@ -122,7 +125,7 @@ public class RotatingTransactionalState {
             return null;
         }
     }
-    
+
     public void cleanupBefore(BigInteger txid) {
         Set<BigInteger> toDelete = new HashSet<BigInteger>();
         toDelete.addAll(_curr.headMap(txid).keySet());
@@ -131,21 +134,21 @@ public class RotatingTransactionalState {
             _state.delete(txPath(tx));
         }
     }
-    
+
     private void sync() {
         List<String> txids = _state.list(_subdir);
-        for(String txid_s: txids) {
+        for (String txid_s : txids) {
             Object data = _state.getData(txPath(txid_s));
             _curr.put(new BigInteger(txid_s), data);
         }
     }
-    
+
     private String txPath(BigInteger tx) {
         return txPath(tx.toString());
     }
 
     private String txPath(String tx) {
         return _subdir + "/" + tx;
-    }    
-    
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
index 3d4a463..02b3d0d 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
@@ -32,16 +32,13 @@ import org.apache.zookeeper.data.ACL;
 public class TestTransactionalState extends TransactionalState {
 
     /**
-     * Matching constructor in absence of a default constructor in the parent
-     * class.
+     * Matching constructor in absence of a default constructor in the parent class.
      */
     protected TestTransactionalState(Map conf, String id, Map componentConf, String subroot) {
         super(conf, id, componentConf, subroot);
     }
 
-    public static void createNode(CuratorFramework curator, 
-            String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
-            throws Exception {
-       TransactionalState.createNode(curator, rootDir, data, acls, mode);
+    public static void createNode(CuratorFramework curator, String rootDir, byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+        TransactionalState.createNode(curator, rootDir, data, acls, mode);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
index 5afcd0a..71d7cc3 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
@@ -40,25 +40,23 @@ public class TransactionalState {
     KryoValuesSerializer _ser;
     KryoValuesDeserializer _des;
     List<ACL> _zkAcls = null;
-    
+
     public static TransactionalState newUserState(Map conf, String id, Map componentConf) {
         return new TransactionalState(conf, id, componentConf, "user");
     }
-    
+
     public static TransactionalState newCoordinatorState(Map conf, String id, Map componentConf) {
-        return new TransactionalState(conf, id, componentConf, "coordinator");        
+        return new TransactionalState(conf, id, componentConf, "coordinator");
     }
-    
+
     protected TransactionalState(Map conf, String id, Map componentConf, String subroot) {
         try {
             conf = new HashMap(conf);
             // ensure that the serialization registrations are consistent with the declarations in this spout
-            if(componentConf!=null) {
-                conf.put(Config.TOPOLOGY_KRYO_REGISTER,
-                         componentConf
-                              .get(Config.TOPOLOGY_KRYO_REGISTER));
+            if (componentConf != null) {
+                conf.put(Config.TOPOLOGY_KRYO_REGISTER, componentConf.get(Config.TOPOLOGY_KRYO_REGISTER));
             }
-            String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
+            String transactionalRoot = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
             String rootDir = transactionalRoot + "/" + id + "/" + subroot;
             List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
             Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
@@ -74,29 +72,24 @@ public class TransactionalState {
             } catch (KeeperException.NodeExistsException e) {
             }
             initter.close();
-                                    
+
             _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
             _ser = new KryoValuesSerializer(conf);
             _des = new KryoValuesDeserializer(conf);
         } catch (Exception e) {
-           throw new RuntimeException(e);
+            throw new RuntimeException(e);
         }
     }
 
-    protected static String forPath(PathAndBytesable<String> builder, 
-            String path, byte[] data) throws Exception {
-        return (data == null) 
-            ? builder.forPath(path) 
-            : builder.forPath(path, data);
+    protected static String forPath(PathAndBytesable<String> builder, String path, byte[] data) throws Exception {
+        return (data == null) ? builder.forPath(path) : builder.forPath(path, data);
     }
 
-    protected static void createNode(CuratorFramework curator, String path,
-            byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
-        ProtectACLCreateModePathAndBytesable<String> builder =
-            curator.create().creatingParentsIfNeeded();
-    
+    protected static void createNode(CuratorFramework curator, String path, byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+        ProtectACLCreateModePathAndBytesable<String> builder = curator.create().creatingParentsIfNeeded();
+
         if (acls == null) {
-            if (mode == null ) {
+            if (mode == null) {
                 TransactionalState.forPath(builder, path, data);
             } else {
                 TransactionalState.forPath(builder.withMode(mode), path, data);
@@ -111,17 +104,16 @@ public class TransactionalState {
         path = "/" + path;
         byte[] ser = _ser.serializeObject(obj);
         try {
-            if(_curator.checkExists().forPath(path)!=null) {
+            if (_curator.checkExists().forPath(path) != null) {
                 _curator.setData().forPath(path, ser);
             } else {
-                TransactionalState.createNode(_curator, path, ser, _zkAcls,
-                        CreateMode.PERSISTENT);
+                TransactionalState.createNode(_curator, path, ser, _zkAcls, CreateMode.PERSISTENT);
             }
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
-        }        
+        }
     }
-    
+
     public void delete(String path) {
         path = "/" + path;
         try {
@@ -130,44 +122,45 @@ public class TransactionalState {
             throw new RuntimeException(e);
         }
     }
-    
+
     public List<String> list(String path) {
         path = "/" + path;
         try {
-            if(_curator.checkExists().forPath(path)==null) {
+            if (_curator.checkExists().forPath(path) == null) {
                 return new ArrayList<String>();
             } else {
                 return _curator.getChildren().forPath(path);
             }
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
-        }   
+        }
     }
-    
+
     public void mkdir(String path) {
         setData(path, 7);
     }
-    
+
     public Object getData(String path) {
         path = "/" + path;
         try {
-            if(_curator.checkExists().forPath(path)!=null) {
+            if (_curator.checkExists().forPath(path) != null) {
                 return _des.deserializeObject(_curator.getData().forPath(path));
             } else {
                 return null;
             }
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
-    
+
     public void close() {
         _curator.close();
     }
-    
+
     private Object getWithBackup(Map amap, Object primary, Object backup) {
         Object ret = amap.get(primary);
-        if(ret==null) return amap.get(backup);
+        if (ret == null)
+            return amap.get(backup);
         return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
index 47df545..eb3d0ce 100644
--- a/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
@@ -20,11 +20,10 @@ package backtype.storm.tuple;
 import java.util.ArrayList;
 import java.util.List;
 
-
-public class BatchTuple {
+public class BatchTuple implements ITupleExt{
     private int targetTaskId;
 
-    private List<Tuple> batch;
+    private List<Tuple> batch = new ArrayList<Tuple>();
     private int batchSize;
 
     public BatchTuple() {
@@ -37,15 +36,12 @@ public class BatchTuple {
     }
 
     public void addToBatch(Tuple tuple) {
-        if (batch == null) {
-            batch = new ArrayList<Tuple>();
-        }
         batch.add(tuple);
     }
 
     public boolean isBatchFull() {
         boolean ret = false;
-        if (batch != null && batch.size() >= batchSize)
+        if (batch.size() >= batchSize)
             ret = true;
 
         return ret;
@@ -60,7 +56,7 @@ public class BatchTuple {
     }
 
     public int currBatchSize() {
-        return batch == null ? 0 : batch.size();
+        return batch.size();
     }
 
     public void setTargetTaskId(int taskId) {
@@ -74,4 +70,16 @@ public class BatchTuple {
     public void setBatchSize(int batchSize) {
         this.batchSize = batchSize;
     }
-}
+
+	@Deprecated
+	public long getCreationTimeStamp() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	@Deprecated
+	public void setCreationTimeStamp(long timeStamp) {
+		// TODO Auto-generated method stub
+		
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java b/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
index 9805ba6..6ba1e5c 100644
--- a/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
@@ -28,26 +28,24 @@ import java.io.Serializable;
 public class Fields implements Iterable<String>, Serializable {
     private List<String> _fields;
     private Map<String, Integer> _index = new HashMap<String, Integer>();
-    
+
     public Fields(String... fields) {
         this(Arrays.asList(fields));
     }
-    
+
     public Fields(List<String> fields) {
         _fields = new ArrayList<String>(fields.size());
         for (String field : fields) {
             if (_fields.contains(field))
-                throw new IllegalArgumentException(
-                    String.format("duplicate field '%s'", field)
-                );
+                throw new IllegalArgumentException(String.format("duplicate field '%s'", field));
             _fields.add(field);
         }
         index();
     }
-    
+
     public List<Object> select(Fields selector, List<Object> tuple) {
         List<Object> ret = new ArrayList<Object>(selector.size());
-        for(String s: selector) {
+        for (String s : selector) {
             ret.add(tuple.get(_index.get(s)));
         }
         return ret;
@@ -56,7 +54,7 @@ public class Fields implements Iterable<String>, Serializable {
     public List<String> toList() {
         return new ArrayList<String>(_fields);
     }
-    
+
     public int size() {
         return _fields.size();
     }
@@ -68,27 +66,27 @@ public class Fields implements Iterable<String>, Serializable {
     public Iterator<String> iterator() {
         return _fields.iterator();
     }
-    
+
     /**
      * Returns the position of the specified field.
      */
     public int fieldIndex(String field) {
         Integer ret = _index.get(field);
-        if(ret==null) {
+        if (ret == null) {
             throw new IllegalArgumentException(field + " does not exist");
         }
         return ret;
     }
-    
+
     /**
      * Returns true if this contains the specified name of the field.
      */
     public boolean contains(String field) {
         return _index.containsKey(field);
     }
-    
+
     private void index() {
-        for(int i=0; i<_fields.size(); i++) {
+        for (int i = 0; i < _fields.size(); i++) {
             _index.put(_fields.get(i), i);
         }
     }
@@ -96,5 +94,5 @@ public class Fields implements Iterable<String>, Serializable {
     @Override
     public String toString() {
         return _fields.toString();
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
index c85848d..21696b5 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
@@ -52,60 +52,50 @@ public interface ITuple {
     public Object getValue(int i);
 
     /**
-     * Returns the String at position i in the tuple. If that field is not a String,
-     * you will get a runtime error.
+     * Returns the String at position i in the tuple. If that field is not a String, you will get a runtime error.
      */
     public String getString(int i);
 
     /**
-     * Returns the Integer at position i in the tuple. If that field is not an Integer,
-     * you will get a runtime error.
+     * Returns the Integer at position i in the tuple. If that field is not an Integer, you will get a runtime error.
      */
     public Integer getInteger(int i);
 
     /**
-     * Returns the Long at position i in the tuple. If that field is not a Long,
-     * you will get a runtime error.
+     * Returns the Long at position i in the tuple. If that field is not a Long, you will get a runtime error.
      */
     public Long getLong(int i);
 
     /**
-     * Returns the Boolean at position i in the tuple. If that field is not a Boolean,
-     * you will get a runtime error.
+     * Returns the Boolean at position i in the tuple. If that field is not a Boolean, you will get a runtime error.
      */
     public Boolean getBoolean(int i);
 
     /**
-     * Returns the Short at position i in the tuple. If that field is not a Short,
-     * you will get a runtime error.
+     * Returns the Short at position i in the tuple. If that field is not a Short, you will get a runtime error.
      */
     public Short getShort(int i);
 
     /**
-     * Returns the Byte at position i in the tuple. If that field is not a Byte,
-     * you will get a runtime error.
+     * Returns the Byte at position i in the tuple. If that field is not a Byte, you will get a runtime error.
      */
     public Byte getByte(int i);
 
     /**
-     * Returns the Double at position i in the tuple. If that field is not a Double,
-     * you will get a runtime error.
+     * Returns the Double at position i in the tuple. If that field is not a Double, you will get a runtime error.
      */
     public Double getDouble(int i);
 
     /**
-     * Returns the Float at position i in the tuple. If that field is not a Float,
-     * you will get a runtime error.
+     * Returns the Float at position i in the tuple. If that field is not a Float, you will get a runtime error.
      */
     public Float getFloat(int i);
 
     /**
-     * Returns the byte array at position i in the tuple. If that field is not a byte array,
-     * you will get a runtime error.
+     * Returns the byte array at position i in the tuple. If that field is not a byte array, you will get a runtime error.
      */
     public byte[] getBinary(int i);
 
-
     public Object getValueByField(String field);
 
     public String getStringByField(String field);
@@ -131,6 +121,4 @@ public interface ITuple {
      */
     public List<Object> getValues();
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java
new file mode 100644
index 0000000..92a7157
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java
@@ -0,0 +1,25 @@
+package backtype.storm.tuple;
+
+public interface ITupleExt {
+    
+    /**
+     * Get Target TaskId
+     * 
+     * @return
+     */
+    int getTargetTaskId();
+
+    void setTargetTaskId(int targetTaskId);
+
+    /**
+     * Get the timeStamp of creating tuple
+     * 
+     * @return
+     */
+    long getCreationTimeStamp();
+
+    /*
+     * set ms
+     */
+    void setCreationTimeStamp(long timeStamp);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java b/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
index 688946d..329a4ae 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
@@ -29,12 +29,12 @@ import java.util.Set;
 
 public class MessageId {
     private Map<Long, Long> _anchorsToIds;
-    
+
     @Deprecated
     public static long generateId() {
         return Utils.secureRandomLong();
     }
-    
+
     public static long generateId(Random rand) {
         return rand.nextLong();
     }
@@ -42,17 +42,17 @@ public class MessageId {
     public static MessageId makeUnanchored() {
         return makeId(new HashMap<Long, Long>());
     }
-        
+
     public static MessageId makeId(Map<Long, Long> anchorsToIds) {
         return new MessageId(anchorsToIds);
     }
-        
+
     public static MessageId makeRootId(long id, long val) {
         Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
         anchorsToIds.put(id, val);
         return new MessageId(anchorsToIds);
     }
-    
+
     protected MessageId(Map<Long, Long> anchorsToIds) {
         _anchorsToIds = anchorsToIds;
     }
@@ -63,8 +63,8 @@ public class MessageId {
 
     public Set<Long> getAnchors() {
         return _anchorsToIds.keySet();
-    }    
-    
+    }
+
     @Override
     public int hashCode() {
         return _anchorsToIds.hashCode();
@@ -72,7 +72,7 @@ public class MessageId {
 
     @Override
     public boolean equals(Object other) {
-        if(other instanceof MessageId) {
+        if (other instanceof MessageId) {
             return _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
         } else {
             return false;
@@ -86,7 +86,7 @@ public class MessageId {
 
     public void serialize(Output out) throws IOException {
         out.writeInt(_anchorsToIds.size(), true);
-        for(Entry<Long, Long> anchorToId: _anchorsToIds.entrySet()) {
+        for (Entry<Long, Long> anchorToId : _anchorsToIds.entrySet()) {
             out.writeLong(anchorToId.getKey());
             out.writeLong(anchorToId.getValue());
         }
@@ -95,7 +95,7 @@ public class MessageId {
     public static MessageId deserialize(Input in) throws IOException {
         int numAnchors = in.readInt(true);
         Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
-        for(int i=0; i<numAnchors; i++) {
+        for (int i = 0; i < numAnchors; i++) {
             anchorsToIds.put(in.readLong(), in.readLong());
         }
         return new MessageId(anchorsToIds);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
index 34dc61a..95253df 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
@@ -21,38 +21,35 @@ import backtype.storm.generated.GlobalStreamId;
 import java.util.List;
 
 /**
- * The tuple is the main data structure in Storm. A tuple is a named list of values, 
- * where each value can be any type. Tuples are dynamically typed -- the types of the fields 
- * do not need to be declared. Tuples have helper methods like getInteger and getString 
- * to get field values without having to cast the result.
+ * The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types
+ * of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result.
  * 
- * Storm needs to know how to serialize all the values in a tuple. By default, Storm 
- * knows how to serialize the primitive types, strings, and byte arrays. If you want to 
- * use another type, you'll need to implement and register a serializer for that type.
- * See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info.
+ * Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If
+ * you want to use another type, you'll need to implement and register a serializer for that type. See {@link http
+ * ://github.com/nathanmarz/storm/wiki/Serialization} for more info.
  */
-public interface Tuple extends ITuple{
+public interface Tuple extends ITuple {
 
     /**
      * Returns the global stream id (component + stream) of this tuple.
      */
     public GlobalStreamId getSourceGlobalStreamid();
-    
+
     /**
      * Gets the id of the component that created this tuple.
      */
     public String getSourceComponent();
-    
+
     /**
      * Gets the id of the task that created this tuple.
      */
     public int getSourceTask();
-    
+
     /**
      * Gets the id of the stream that this tuple was emitted to.
      */
     public String getSourceStreamId();
-    
+
     /**
      * Gets the message id that associated with this tuple.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
index 60676c9..8f004cc 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
@@ -17,13 +17,6 @@
  */
 package backtype.storm.tuple;
 
-public interface TupleExt extends Tuple {
-    /**
-     * Get Target TaskId
-     * 
-     * @return
-     */
-    int getTargetTaskId();
+public interface TupleExt extends Tuple, ITupleExt {
     
-    void setTargetTaskId(int targetTaskId);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
index 818eff1..417774e 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
@@ -41,31 +41,29 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     private GeneralTopologyContext context;
     private MessageId id;
     private IPersistentMap _meta = null;
-    
+
     public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
         this.values = values;
         this.taskId = taskId;
         this.streamId = streamId;
         this.id = id;
         this.context = context;
-        
+
         String componentId = context.getComponentId(taskId);
         Fields schema = context.getComponentOutputFields(componentId, streamId);
-        if(values.size()!=schema.size()) {
-            throw new IllegalArgumentException(
-                    "Tuple created with wrong number of fields. " +
-                    "Expected " + schema.size() + " fields but got " +
-                    values.size() + " fields");
+        if (values.size() != schema.size()) {
+            throw new IllegalArgumentException("Tuple created with wrong number of fields. " + "Expected " + schema.size() + " fields but got " + values.size()
+                    + " fields");
         }
     }
 
     public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
         this(context, values, taskId, streamId, MessageId.makeUnanchored());
-    }    
-    
+    }
+
     Long _processSampleStartTime = null;
     Long _executeSampleStartTime = null;
-    
+
     public void setProcessSampleStartTime(long ms) {
         _processSampleStartTime = ms;
     }
@@ -73,7 +71,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public Long getProcessSampleStartTime() {
         return _processSampleStartTime;
     }
-    
+
     public void setExecuteSampleStartTime(long ms) {
         _executeSampleStartTime = ms;
     }
@@ -81,13 +79,13 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public Long getExecuteSampleStartTime() {
         return _executeSampleStartTime;
     }
-    
+
     long _outAckVal = 0;
-    
+
     public void updateAckVal(long val) {
         _outAckVal = _outAckVal ^ val;
     }
-    
+
     public long getAckVal() {
         return _outAckVal;
     }
@@ -95,15 +93,15 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public int size() {
         return values.size();
     }
-    
+
     public int fieldIndex(String field) {
         return getFields().fieldIndex(field);
     }
-    
+
     public boolean contains(String field) {
         return getFields().contains(field);
     }
-    
+
     public Object getValue(int i) {
         return values.get(i);
     }
@@ -143,8 +141,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public byte[] getBinary(int i) {
         return (byte[]) values.get(i);
     }
-    
-    
+
     public Object getValueByField(String field) {
         return values.get(fieldIndex(field));
     }
@@ -184,11 +181,11 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public byte[] getBinaryByField(String field) {
         return (byte[]) values.get(fieldIndex(field));
     }
-    
+
     public List<Object> getValues() {
         return values;
     }
-    
+
     public Fields getFields() {
         return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
     }
@@ -196,37 +193,37 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public List<Object> select(Fields selector) {
         return getFields().select(selector, values);
     }
-      
+
     public GlobalStreamId getSourceGlobalStreamid() {
         return new GlobalStreamId(getSourceComponent(), streamId);
     }
-    
+
     public String getSourceComponent() {
         return context.getComponentId(taskId);
     }
-    
+
     public int getSourceTask() {
         return taskId;
     }
-    
+
     public String getSourceStreamId() {
         return streamId;
     }
-    
+
     public MessageId getMessageId() {
         return id;
     }
-    
+
     @Override
     public String toString() {
-        return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString();
+        return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: " + id.toString() + ", " + values.toString();
     }
-    
+
     @Override
     public boolean equals(Object other) {
         return this == other;
-    }    
-    
+    }
+
     @Override
     public int hashCode() {
         return System.identityHashCode(this);
@@ -234,25 +231,25 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
 
     private final Keyword makeKeyword(String name) {
         return Keyword.intern(Symbol.create(name));
-    }    
+    }
 
     /* ILookup */
     @Override
     public Object valAt(Object o) {
         try {
-            if(o instanceof Keyword) {
+            if (o instanceof Keyword) {
                 return getValueByField(((Keyword) o).getName());
-            } else if(o instanceof String) {
+            } else if (o instanceof String) {
                 return getValueByField((String) o);
             }
-        } catch(IllegalArgumentException e) {
+        } catch (IllegalArgumentException e) {
         }
         return null;
     }
 
     /* Seqable */
     public ISeq seq() {
-        if(values.size() > 0) {
+        if (values.size() > 0) {
             return new Seq(getFields().toList(), values, 0);
         }
         return null;
@@ -272,7 +269,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
 
         public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
             super(meta);
-            this.fields= fields;
+            this.fields = fields;
             this.values = values;
             assert i >= 0;
             this.i = i;
@@ -283,16 +280,16 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         }
 
         public ISeq next() {
-            if(i+1 < fields.size()) {
-                return new Seq(fields, values, i+1);
+            if (i + 1 < fields.size()) {
+                return new Seq(fields, values, i + 1);
             }
             return null;
         }
 
         public int count() {
-            assert fields.size() -i >= 0 : "index out of bounds";
+            assert fields.size() - i >= 0 : "index out of bounds";
             // i being the position in the fields of this seq, the remainder of the seq is the size
-            return fields.size() -i;
+            return fields.size() - i;
         }
 
         public Obj withMeta(IPersistentMap meta) {
@@ -302,7 +299,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
 
     /* Indexed */
     public Object nth(int i) {
-        if(i < values.size()) {
+        if (i < values.size()) {
             return values.get(i);
         } else {
             return null;
@@ -311,7 +308,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
 
     public Object nth(int i, Object notfound) {
         Object ret = nth(i);
-        if(ret==null) ret = notfound;
+        if (ret == null)
+            ret = notfound;
         return ret;
     }
 
@@ -319,33 +317,32 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public int count() {
         return values.size();
     }
-    
+
     /* IMeta */
     public IPersistentMap meta() {
-        if(_meta==null) {
-            _meta = new PersistentArrayMap( new Object[] {
-            makeKeyword("stream"), getSourceStreamId(), 
-            makeKeyword("component"), getSourceComponent(), 
-            makeKeyword("task"), getSourceTask()});
+        if (_meta == null) {
+            _meta =
+                    new PersistentArrayMap(new Object[] { makeKeyword("stream"), getSourceStreamId(), makeKeyword("component"), getSourceComponent(),
+                            makeKeyword("task"), getSourceTask() });
         }
         return _meta;
     }
 
     private PersistentArrayMap toMap() {
-        Object array[] = new Object[values.size()*2];
+        Object array[] = new Object[values.size() * 2];
         List<String> fields = getFields().toList();
-        for(int i=0; i < values.size(); i++) {
-            array[i*2] = fields.get(i);
-            array[(i*2)+1] = values.get(i);
+        for (int i = 0; i < values.size(); i++) {
+            array[i * 2] = fields.get(i);
+            array[(i * 2) + 1] = values.get(i);
         }
         return new PersistentArrayMap(array);
     }
 
     public IPersistentMap getMap() {
-        if(_map==null) {
+        if (_map == null) {
             setMap(toMap());
         }
         return _map;
-    }    
-    
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
index 2e966a0..4017769 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
@@ -22,25 +22,36 @@ import java.util.List;
 import backtype.storm.task.GeneralTopologyContext;
 
 public class TupleImplExt extends TupleImpl implements TupleExt {
-    
+
     protected int targetTaskId;
-    
+    protected long creationTimeStamp = System.currentTimeMillis();
+
     public TupleImplExt(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
         super(context, values, taskId, streamId);
     }
-    
+
     public TupleImplExt(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
         super(context, values, taskId, streamId, id);
     }
-    
+
     @Override
     public int getTargetTaskId() {
         return targetTaskId;
     }
-    
+
     @Override
     public void setTargetTaskId(int targetTaskId) {
         this.targetTaskId = targetTaskId;
     }
-    
+
+	@Override
+	public long getCreationTimeStamp() {
+		return creationTimeStamp;
+	}
+
+	@Override
+	public void setCreationTimeStamp(long timeStamp) {
+		this.creationTimeStamp = timeStamp;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Values.java b/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
index 41bbc71..c25363b 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
@@ -20,17 +20,16 @@ package backtype.storm.tuple;
 import java.util.ArrayList;
 
 /**
- * A convenience class for making tuple values using new Values("field1", 2, 3)
- * syntax.
+ * A convenience class for making tuple values using new Values("field1", 2, 3) syntax.
  */
-public class Values extends ArrayList<Object>{
+public class Values extends ArrayList<Object> {
     public Values() {
-        
+
     }
-    
+
     public Values(Object... vals) {
         super(vals.length);
-        for(Object o: vals) {
+        for (Object o : vals) {
             add(o);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java b/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
index 1311d6d..d9fa692 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
@@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 
-
 public class BufferFileInputStream {
     byte[] buffer;
     FileInputStream stream;
@@ -33,15 +32,15 @@ public class BufferFileInputStream {
     }
 
     public BufferFileInputStream(String file) throws FileNotFoundException {
-        this(file, 15*1024);
+        this(file, 15 * 1024);
     }
 
     public byte[] read() throws IOException {
         int length = stream.read(buffer);
-        if(length==-1) {
+        if (length == -1) {
             close();
             return new byte[0];
-        } else if(length==buffer.length) {
+        } else if (length == buffer.length) {
             return buffer;
         } else {
             return Arrays.copyOf(buffer, length);