You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/02 00:20:06 UTC

[1/3] storm git commit: General cleanup of the generics.

Repository: storm
Updated Branches:
  refs/heads/master c559f421a -> 3ff465c5c


General cleanup of the generics.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9e4c3df1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9e4c3df1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9e4c3df1

Branch: refs/heads/master
Commit: 9e4c3df17ffbc737210e606d3d8a9cdae8f86634
Parents: 9e80903
Author: ddebree <dd...@gmail.com>
Authored: Thu Aug 13 15:43:17 2015 +0200
Committer: ddebree <dd...@gmail.com>
Committed: Thu Aug 13 15:43:17 2015 +0200

----------------------------------------------------------------------
 .../jvm/storm/trident/spout/ITridentSpout.java  |  4 +--
 .../OpaquePartitionedTridentSpoutExecutor.java  | 20 +++++++--------
 .../spout/PartitionedTridentSpoutExecutor.java  | 26 ++++++++++----------
 .../trident/spout/RichSpoutBatchExecutor.java   |  4 +--
 .../trident/spout/RichSpoutBatchTriggerer.java  | 14 +++++------
 .../trident/spout/TridentSpoutCoordinator.java  |  6 ++---
 .../trident/spout/TridentSpoutExecutor.java     | 10 ++++----
 .../trident/topology/TridentBoltExecutor.java   | 15 ++++++-----
 8 files changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
index 2637b54..bfef745 100644
--- a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
+++ b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
@@ -26,7 +26,7 @@ import storm.trident.operation.TridentCollector;
 
 
 public interface ITridentSpout<T> extends Serializable {
-    public interface BatchCoordinator<X> {
+    interface BatchCoordinator<X> {
         /**
          * Create metadata for this particular transaction id which has never
          * been emitted before. The metadata should contain whatever is necessary
@@ -55,7 +55,7 @@ public interface ITridentSpout<T> extends Serializable {
         void close();
     }
     
-    public interface Emitter<X> {
+    interface Emitter<X> {
         /**
          * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata
          * was created by the Coordinator in the initializeTranaction method. This method must always emit

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index d1b3fe8..9cd98ee 100644
--- a/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -35,7 +35,7 @@ import storm.trident.topology.TransactionAttempt;
 
 
 public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
-    IOpaquePartitionedTridentSpout _spout;
+    IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
     
     public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
         IOpaquePartitionedTridentSpout.Coordinator _coordinator;
@@ -75,10 +75,10 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
     }
     
     public class Emitter implements ICommitterTridentSpout.Emitter {        
-        IOpaquePartitionedTridentSpout.Emitter _emitter;
+        IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
         TransactionalState _state;
-        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<Long, Map<String, Object>>();
-        Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
+        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
+        Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
         int _index;
         int _numTasks;
         
@@ -97,7 +97,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
             if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                 List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
                 _partitionStates.clear();
-                List<ISpoutPartition> myPartitions = new ArrayList();
+                List<ISpoutPartition> myPartitions = new ArrayList<>();
                 for(int i=_index; i < partitions.size(); i+=_numTasks) {
                     ISpoutPartition p = partitions.get(i);
                     String id = p.getId();
@@ -108,7 +108,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
                 _savedCoordinatorMeta = coordinatorMeta;
                 _changedMeta = true;
             }
-            Map<String, Object> metas = new HashMap<String, Object>();
+            Map<String, Object> metas = new HashMap<>();
             _cachedMetas.put(tx.getTransactionId(), metas);
 
             Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
@@ -116,7 +116,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
             if(entry!=null) {
                 prevCached = entry.getValue();
             } else {
-                prevCached = new HashMap<String, Object>();
+                prevCached = new HashMap<>();
             }
             
             for(String id: _partitionStates.keySet()) {
@@ -147,8 +147,8 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
             // another attempt of the batch to commit, the batch phase must have succeeded in between.
             // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
             if(_changedMeta && _index==0) {
-                Set<String> validIds = new HashSet<String>();
-                for(ISpoutPartition p: (List<ISpoutPartition>) _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
+                Set<String> validIds = new HashSet<>();
+                for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
                     validIds.add(p.getId());
                 }
                 for(String existingPartition: _state.list("")) {
@@ -174,7 +174,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
         }
     } 
     
-    public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout spout) {
+    public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
         _spout = spout;
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
index f96efca..a11afda 100644
--- a/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -30,25 +30,25 @@ import storm.trident.topology.state.TransactionalState;
 
 
 public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
-    IPartitionedTridentSpout _spout;
+    IPartitionedTridentSpout<Integer, ISpoutPartition, Object> _spout;
     
-    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) {
+    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Integer, ISpoutPartition, Object> spout) {
         _spout = spout;
     }
     
-    public IPartitionedTridentSpout getPartitionedSpout() {
+    public IPartitionedTridentSpout<Integer, ISpoutPartition, Object> getPartitionedSpout() {
         return _spout;
     }
     
-    class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
-        private IPartitionedTridentSpout.Coordinator _coordinator;
+    class Coordinator implements ITridentSpout.BatchCoordinator<Integer> {
+        private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;
         
         public Coordinator(Map conf, TopologyContext context) {
             _coordinator = _spout.getCoordinator(conf, context);
         }
         
         @Override
-        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
+        public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
             if(currMetadata!=null) {
                 return currMetadata;
             } else {
@@ -82,10 +82,10 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
         }
     }
     
-    class Emitter implements ITridentSpout.Emitter<Object> {
-        private IPartitionedTridentSpout.Emitter _emitter;
+    class Emitter implements ITridentSpout.Emitter<Integer> {
+        private IPartitionedTridentSpout.Emitter<Integer, ISpoutPartition, Object> _emitter;
         private TransactionalState _state;
-        private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
+        private Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
         private int _index;
         private int _numTasks;
         
@@ -100,12 +100,12 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
 
         
         @Override
-        public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta,
+        public void emitBatch(final TransactionAttempt tx, final Integer coordinatorMeta,
                 final TridentCollector collector) {
             if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                 List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
                 _partitionStates.clear();
-                List<ISpoutPartition> myPartitions = new ArrayList();
+                List<ISpoutPartition> myPartitions = new ArrayList<>();
                 for(int i=_index; i < partitions.size(); i+=_numTasks) {
                     ISpoutPartition p = partitions.get(i);
                     String id = p.getId();
@@ -150,12 +150,12 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
     }    
 
     @Override
-    public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
+    public ITridentSpout.BatchCoordinator<Integer> getCoordinator(String txStateId, Map conf, TopologyContext context) {
         return new Coordinator(conf, context);
     }
 
     @Override
-    public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
+    public ITridentSpout.Emitter<Integer> getEmitter(String txStateId, Map conf, TopologyContext context) {
         return new Emitter(txStateId, conf, context);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index ab9fd4b..8e414c7 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -78,7 +78,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
             if(batchSize==null) batchSize = 1000;
             _maxBatchSize = batchSize.intValue();
             _collector = new CaptureCollector();
-            idsMap = new RotatingMap(3);
+            idsMap = new RotatingMap<>(3);
             rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
         }
         
@@ -174,7 +174,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
         public long pendingCount;
         public void reset(TridentCollector c) {
             _collector = c;
-            ids = new ArrayList<Object>();
+            ids = new ArrayList<>();
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index 0380728..1e554d8 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -56,7 +56,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
-        _outputTasks = new ArrayList<Integer>();
+        _outputTasks = new ArrayList<>();
         for(String component: Utils.get(context.getThisTargets(),
                                         _coordStream,
                                         new HashMap<String, Grouping>()).keySet()) {
@@ -119,20 +119,20 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> conf = _delegate.getComponentConfiguration();
-        if(conf==null) conf = new HashMap();
-        else conf = new HashMap(conf);
+        if(conf==null) conf = new HashMap<>();
+        else conf = new HashMap<>(conf);
         Config.registerSerialization(conf, RichSpoutBatchId.class, RichSpoutBatchIdSerializer.class);
         return conf;
     }
     
     static class FinishCondition {
-        Set<Long> vals = new HashSet<Long>();
+        Set<Long> vals = new HashSet<>();
         Object msgId;
     }
     
-    Map<Long, Long> _msgIdToBatchId = new HashMap();
+    Map<Long, Long> _msgIdToBatchId = new HashMap<>();
     
-    Map<Long, FinishCondition> _finishConditions = new HashMap();
+    Map<Long, FinishCondition> _finishConditions = new HashMap<>();
     
     class StreamOverrideCollector implements ISpoutOutputCollector {
         
@@ -149,7 +149,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
             FinishCondition finish = new FinishCondition();
             finish.msgId = msgId;
             List<Integer> tasks = _collector.emit(_stream, new ConsList(batchId, values));
-            Set<Integer> outTasksSet = new HashSet<Integer>(tasks);
+            Set<Integer> outTasksSet = new HashSet<>(tasks);
             for(Integer t: _outputTasks) {
                 int count = 0;
                 if(outTasksSet.contains(t)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
index a936e19..d9acb5f 100644
--- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
+++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
@@ -38,14 +38,14 @@ public class TridentSpoutCoordinator implements IBasicBolt {
     public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
     private static final String META_DIR = "meta";
 
-    ITridentSpout _spout;
-    ITridentSpout.BatchCoordinator _coord;
+    ITridentSpout<Object> _spout;
+    ITridentSpout.BatchCoordinator<Object> _coord;
     RotatingTransactionalState _state;
     TransactionalState _underlyingState;
     String _id;
 
     
-    public TridentSpoutCoordinator(String id, ITridentSpout spout) {
+    public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) {
         _spout = spout;
         _id = id;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
index 22b304a..6431426 100644
--- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
@@ -42,14 +42,14 @@ public class TridentSpoutExecutor implements ITridentBatchBolt {
     public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);    
 
     AddIdCollector _collector;
-    ITridentSpout _spout;
-    ITridentSpout.Emitter _emitter;
+    ITridentSpout<Object> _spout;
+    ITridentSpout.Emitter<Object> _emitter;
     String _streamName;
     String _txStateId;
     
-    TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<Long, TransactionAttempt>();
+    TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();
 
-    public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout spout) {
+    public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) {
         _txStateId = txStateId;
         _spout = spout;
         _streamName = streamName;
@@ -91,7 +91,7 @@ public class TridentSpoutExecutor implements ITridentBatchBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> fields = new ArrayList(_spout.getOutputFields().toList());
+        List<String> fields = new ArrayList<>(_spout.getOutputFields().toList());
         fields.add(0, ID_FIELD);
         declarer.declareStream(_streamName, new Fields(fields));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index a23e555..5266e1c 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -18,7 +18,6 @@
 package storm.trident.topology;
 
 import backtype.storm.Config;
-import backtype.storm.Constants;
 import backtype.storm.coordination.BatchOutputCollector;
 import backtype.storm.coordination.BatchOutputCollectorImpl;
 import backtype.storm.generated.GlobalStreamId;
@@ -106,7 +105,7 @@ public class TridentBoltExecutor implements IRichBolt {
     long _messageTimeoutMs;
     long _lastRotate;
     
-    RotatingMap _batches;
+    RotatingMap<Object, TrackedBatch> _batches;
     
     // map from batchgroupid to coordspec
     public TridentBoltExecutor(ITridentBatchBolt bolt, Map<GlobalStreamId, String> batchGroupIds, Map<String, CoordSpec> coordinationSpecs) {
@@ -122,7 +121,7 @@ public class TridentBoltExecutor implements IRichBolt {
         int reportedTasks = 0;
         int expectedTupleCount = 0;
         int receivedTuples = 0;
-        Map<Integer, Integer> taskEmittedTuples = new HashMap();
+        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
         boolean failed = false;
         boolean receivedCommit;
         Tuple delayedAck = null;
@@ -143,7 +142,7 @@ public class TridentBoltExecutor implements IRichBolt {
     public class CoordinatedOutputCollector implements IOutputCollector {
         IOutputCollector _delegate;
         
-        TrackedBatch _currBatch = null;;
+        TrackedBatch _currBatch = null;
         
         public void setCurrBatch(TrackedBatch batch) {
             _currBatch = batch;
@@ -197,7 +196,7 @@ public class TridentBoltExecutor implements IRichBolt {
     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {        
         _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
         _lastRotate = System.currentTimeMillis();
-        _batches = new RotatingMap(2);
+        _batches = new RotatingMap<>(2);
         _context = context;
         _collector = collector;
         _coordCollector = new CoordinatedOutputCollector(collector);
@@ -205,7 +204,7 @@ public class TridentBoltExecutor implements IRichBolt {
                 
         _coordConditions = (Map) context.getExecutorData("__coordConditions");
         if(_coordConditions==null) {
-            _coordConditions = new HashMap();
+            _coordConditions = new HashMap<>();
             for(String batchGroup: _coordSpecs.keySet()) {
                 CoordSpec spec = _coordSpecs.get(batchGroup);
                 CoordCondition cond = new CoordCondition();
@@ -219,7 +218,7 @@ public class TridentBoltExecutor implements IRichBolt {
                         cond.expectedTaskReports+=context.getComponentTasks(comp).size();
                     }
                 }
-                cond.targetTasks = new HashSet<Integer>();
+                cond.targetTasks = new HashSet<>();
                 for(String component: Utils.get(context.getThisTargets(),
                                         COORD_STREAM(batchGroup),
                                         new HashMap<String, Grouping>()).keySet()) {
@@ -399,7 +398,7 @@ public class TridentBoltExecutor implements IRichBolt {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = _bolt.getComponentConfiguration();
-        if(ret==null) ret = new HashMap();
+        if(ret==null) ret = new HashMap<>();
         ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
         // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
         return ret;


[2/3] storm git commit: Merge branch 'generics_cleanup' of https://github.com/ddebree/storm into STORM-991

Posted by ka...@apache.org.
Merge branch 'generics_cleanup' of https://github.com/ddebree/storm into STORM-991


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f86b3cd5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f86b3cd5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f86b3cd5

Branch: refs/heads/master
Commit: f86b3cd5827ebb7ae27b07a419bd01cd3d6b80e3
Parents: c559f42 9e4c3df
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 07:10:25 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 07:10:25 2015 +0900

----------------------------------------------------------------------
 .../jvm/storm/trident/spout/ITridentSpout.java  |  4 +--
 .../OpaquePartitionedTridentSpoutExecutor.java  | 20 +++++++--------
 .../spout/PartitionedTridentSpoutExecutor.java  | 26 ++++++++++----------
 .../trident/spout/RichSpoutBatchExecutor.java   |  4 +--
 .../trident/spout/RichSpoutBatchTriggerer.java  | 14 +++++------
 .../trident/spout/TridentSpoutCoordinator.java  |  6 ++---
 .../trident/spout/TridentSpoutExecutor.java     | 10 ++++----
 .../trident/topology/TridentBoltExecutor.java   | 15 ++++++-----
 8 files changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f86b3cd5/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/f86b3cd5/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 2019ea1,5266e1c..fa68ae2
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@@ -140,10 -139,10 +139,10 @@@ public class TridentBoltExecutor implem
          }        
      }
      
 -    public class CoordinatedOutputCollector implements IOutputCollector {
 +    private static class CoordinatedOutputCollector implements IOutputCollector {
          IOutputCollector _delegate;
          
-         TrackedBatch _currBatch = null;;
+         TrackedBatch _currBatch = null;
          
          public void setCurrBatch(TrackedBatch batch) {
              _currBatch = batch;


[3/3] storm git commit: add STORM-991 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-991 to CHANGELOG.md

* also add Dean de Bree to contributor list


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3ff465c5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3ff465c5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3ff465c5

Branch: refs/heads/master
Commit: 3ff465c5c0720f6c9701b2b7b807a94c92533817
Parents: f86b3cd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 07:19:29 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 07:19:29 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md    | 1 +
 README.markdown | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3ff465c5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4456564..e384de7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-991: General cleanup of the generics (storm.trident.spout package)
  * STORM-1000: Use static member classes when permitted 
  * STORM-1003: In cluster.clj replace task-id with component-id in the declaration
  * STORM-1013: [storm-elasticsearch] Expose TransportClient configuration Map to EsConfig

http://git-wip-us.apache.org/repos/asf/storm/blob/3ff465c5/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index 6521f4a..de02346 100644
--- a/README.markdown
+++ b/README.markdown
@@ -222,6 +222,7 @@ under the License.
 * Sanket Reddy ([@redsanket](https://github.com/redsanket))
 * Drew Robb ([@drewrobb](https://github.com/drewrobb))
 * Frantz Mazoyer ([@fmazoyer](https://github.com/fmazoyer))
+* Dean de Bree ([@ddebree](https://github.com/ddebree))
 
 ## Acknowledgements