You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/02 00:20:06 UTC
[1/3] storm git commit: General cleanup of the generics.
Repository: storm
Updated Branches:
refs/heads/master c559f421a -> 3ff465c5c
General cleanup of the generics.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9e4c3df1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9e4c3df1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9e4c3df1
Branch: refs/heads/master
Commit: 9e4c3df17ffbc737210e606d3d8a9cdae8f86634
Parents: 9e80903
Author: ddebree <dd...@gmail.com>
Authored: Thu Aug 13 15:43:17 2015 +0200
Committer: ddebree <dd...@gmail.com>
Committed: Thu Aug 13 15:43:17 2015 +0200
----------------------------------------------------------------------
.../jvm/storm/trident/spout/ITridentSpout.java | 4 +--
.../OpaquePartitionedTridentSpoutExecutor.java | 20 +++++++--------
.../spout/PartitionedTridentSpoutExecutor.java | 26 ++++++++++----------
.../trident/spout/RichSpoutBatchExecutor.java | 4 +--
.../trident/spout/RichSpoutBatchTriggerer.java | 14 +++++------
.../trident/spout/TridentSpoutCoordinator.java | 6 ++---
.../trident/spout/TridentSpoutExecutor.java | 10 ++++----
.../trident/topology/TridentBoltExecutor.java | 15 ++++++-----
8 files changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
index 2637b54..bfef745 100644
--- a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
+++ b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
@@ -26,7 +26,7 @@ import storm.trident.operation.TridentCollector;
public interface ITridentSpout<T> extends Serializable {
- public interface BatchCoordinator<X> {
+ interface BatchCoordinator<X> {
/**
* Create metadata for this particular transaction id which has never
* been emitted before. The metadata should contain whatever is necessary
@@ -55,7 +55,7 @@ public interface ITridentSpout<T> extends Serializable {
void close();
}
- public interface Emitter<X> {
+ interface Emitter<X> {
/**
* Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata
* was created by the Coordinator in the initializeTranaction method. This method must always emit
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index d1b3fe8..9cd98ee 100644
--- a/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -35,7 +35,7 @@ import storm.trident.topology.TransactionAttempt;
public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
- IOpaquePartitionedTridentSpout _spout;
+ IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
IOpaquePartitionedTridentSpout.Coordinator _coordinator;
@@ -75,10 +75,10 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
}
public class Emitter implements ICommitterTridentSpout.Emitter {
- IOpaquePartitionedTridentSpout.Emitter _emitter;
+ IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
TransactionalState _state;
- TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<Long, Map<String, Object>>();
- Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
+ TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
+ Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
int _index;
int _numTasks;
@@ -97,7 +97,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
- List<ISpoutPartition> myPartitions = new ArrayList();
+ List<ISpoutPartition> myPartitions = new ArrayList<>();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
@@ -108,7 +108,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
- Map<String, Object> metas = new HashMap<String, Object>();
+ Map<String, Object> metas = new HashMap<>();
_cachedMetas.put(tx.getTransactionId(), metas);
Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
@@ -116,7 +116,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
if(entry!=null) {
prevCached = entry.getValue();
} else {
- prevCached = new HashMap<String, Object>();
+ prevCached = new HashMap<>();
}
for(String id: _partitionStates.keySet()) {
@@ -147,8 +147,8 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
// another attempt of the batch to commit, the batch phase must have succeeded in between.
// hence, all tasks for the prior commit must have finished committing (whether successfully or not)
if(_changedMeta && _index==0) {
- Set<String> validIds = new HashSet<String>();
- for(ISpoutPartition p: (List<ISpoutPartition>) _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
+ Set<String> validIds = new HashSet<>();
+ for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
validIds.add(p.getId());
}
for(String existingPartition: _state.list("")) {
@@ -174,7 +174,7 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
}
}
- public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout spout) {
+ public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
_spout = spout;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
index f96efca..a11afda 100644
--- a/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -30,25 +30,25 @@ import storm.trident.topology.state.TransactionalState;
public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
- IPartitionedTridentSpout _spout;
+ IPartitionedTridentSpout<Integer, ISpoutPartition, Object> _spout;
- public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) {
+ public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Integer, ISpoutPartition, Object> spout) {
_spout = spout;
}
- public IPartitionedTridentSpout getPartitionedSpout() {
+ public IPartitionedTridentSpout<Integer, ISpoutPartition, Object> getPartitionedSpout() {
return _spout;
}
- class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
- private IPartitionedTridentSpout.Coordinator _coordinator;
+ class Coordinator implements ITridentSpout.BatchCoordinator<Integer> {
+ private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;
public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}
@Override
- public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
+ public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
if(currMetadata!=null) {
return currMetadata;
} else {
@@ -82,10 +82,10 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
}
}
- class Emitter implements ITridentSpout.Emitter<Object> {
- private IPartitionedTridentSpout.Emitter _emitter;
+ class Emitter implements ITridentSpout.Emitter<Integer> {
+ private IPartitionedTridentSpout.Emitter<Integer, ISpoutPartition, Object> _emitter;
private TransactionalState _state;
- private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
+ private Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
private int _index;
private int _numTasks;
@@ -100,12 +100,12 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
@Override
- public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta,
+ public void emitBatch(final TransactionAttempt tx, final Integer coordinatorMeta,
final TridentCollector collector) {
if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
- List<ISpoutPartition> myPartitions = new ArrayList();
+ List<ISpoutPartition> myPartitions = new ArrayList<>();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
@@ -150,12 +150,12 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
}
@Override
- public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
+ public ITridentSpout.BatchCoordinator<Integer> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new Coordinator(conf, context);
}
@Override
- public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
+ public ITridentSpout.Emitter<Integer> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new Emitter(txStateId, conf, context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index ab9fd4b..8e414c7 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -78,7 +78,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
if(batchSize==null) batchSize = 1000;
_maxBatchSize = batchSize.intValue();
_collector = new CaptureCollector();
- idsMap = new RotatingMap(3);
+ idsMap = new RotatingMap<>(3);
rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
}
@@ -174,7 +174,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
public long pendingCount;
public void reset(TridentCollector c) {
_collector = c;
- ids = new ArrayList<Object>();
+ ids = new ArrayList<>();
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index 0380728..1e554d8 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -56,7 +56,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
- _outputTasks = new ArrayList<Integer>();
+ _outputTasks = new ArrayList<>();
for(String component: Utils.get(context.getThisTargets(),
_coordStream,
new HashMap<String, Grouping>()).keySet()) {
@@ -119,20 +119,20 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = _delegate.getComponentConfiguration();
- if(conf==null) conf = new HashMap();
- else conf = new HashMap(conf);
+ if(conf==null) conf = new HashMap<>();
+ else conf = new HashMap<>(conf);
Config.registerSerialization(conf, RichSpoutBatchId.class, RichSpoutBatchIdSerializer.class);
return conf;
}
static class FinishCondition {
- Set<Long> vals = new HashSet<Long>();
+ Set<Long> vals = new HashSet<>();
Object msgId;
}
- Map<Long, Long> _msgIdToBatchId = new HashMap();
+ Map<Long, Long> _msgIdToBatchId = new HashMap<>();
- Map<Long, FinishCondition> _finishConditions = new HashMap();
+ Map<Long, FinishCondition> _finishConditions = new HashMap<>();
class StreamOverrideCollector implements ISpoutOutputCollector {
@@ -149,7 +149,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
FinishCondition finish = new FinishCondition();
finish.msgId = msgId;
List<Integer> tasks = _collector.emit(_stream, new ConsList(batchId, values));
- Set<Integer> outTasksSet = new HashSet<Integer>(tasks);
+ Set<Integer> outTasksSet = new HashSet<>(tasks);
for(Integer t: _outputTasks) {
int count = 0;
if(outTasksSet.contains(t)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
index a936e19..d9acb5f 100644
--- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
+++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
@@ -38,14 +38,14 @@ public class TridentSpoutCoordinator implements IBasicBolt {
public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
private static final String META_DIR = "meta";
- ITridentSpout _spout;
- ITridentSpout.BatchCoordinator _coord;
+ ITridentSpout<Object> _spout;
+ ITridentSpout.BatchCoordinator<Object> _coord;
RotatingTransactionalState _state;
TransactionalState _underlyingState;
String _id;
- public TridentSpoutCoordinator(String id, ITridentSpout spout) {
+ public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) {
_spout = spout;
_id = id;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
index 22b304a..6431426 100644
--- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
@@ -42,14 +42,14 @@ public class TridentSpoutExecutor implements ITridentBatchBolt {
public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
AddIdCollector _collector;
- ITridentSpout _spout;
- ITridentSpout.Emitter _emitter;
+ ITridentSpout<Object> _spout;
+ ITridentSpout.Emitter<Object> _emitter;
String _streamName;
String _txStateId;
- TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<Long, TransactionAttempt>();
+ TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();
- public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout spout) {
+ public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) {
_txStateId = txStateId;
_spout = spout;
_streamName = streamName;
@@ -91,7 +91,7 @@ public class TridentSpoutExecutor implements ITridentBatchBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- List<String> fields = new ArrayList(_spout.getOutputFields().toList());
+ List<String> fields = new ArrayList<>(_spout.getOutputFields().toList());
fields.add(0, ID_FIELD);
declarer.declareStream(_streamName, new Fields(fields));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9e4c3df1/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index a23e555..5266e1c 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -18,7 +18,6 @@
package storm.trident.topology;
import backtype.storm.Config;
-import backtype.storm.Constants;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.coordination.BatchOutputCollectorImpl;
import backtype.storm.generated.GlobalStreamId;
@@ -106,7 +105,7 @@ public class TridentBoltExecutor implements IRichBolt {
long _messageTimeoutMs;
long _lastRotate;
- RotatingMap _batches;
+ RotatingMap<Object, TrackedBatch> _batches;
// map from batchgroupid to coordspec
public TridentBoltExecutor(ITridentBatchBolt bolt, Map<GlobalStreamId, String> batchGroupIds, Map<String, CoordSpec> coordinationSpecs) {
@@ -122,7 +121,7 @@ public class TridentBoltExecutor implements IRichBolt {
int reportedTasks = 0;
int expectedTupleCount = 0;
int receivedTuples = 0;
- Map<Integer, Integer> taskEmittedTuples = new HashMap();
+ Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
boolean failed = false;
boolean receivedCommit;
Tuple delayedAck = null;
@@ -143,7 +142,7 @@ public class TridentBoltExecutor implements IRichBolt {
public class CoordinatedOutputCollector implements IOutputCollector {
IOutputCollector _delegate;
- TrackedBatch _currBatch = null;;
+ TrackedBatch _currBatch = null;
public void setCurrBatch(TrackedBatch batch) {
_currBatch = batch;
@@ -197,7 +196,7 @@ public class TridentBoltExecutor implements IRichBolt {
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
_lastRotate = System.currentTimeMillis();
- _batches = new RotatingMap(2);
+ _batches = new RotatingMap<>(2);
_context = context;
_collector = collector;
_coordCollector = new CoordinatedOutputCollector(collector);
@@ -205,7 +204,7 @@ public class TridentBoltExecutor implements IRichBolt {
_coordConditions = (Map) context.getExecutorData("__coordConditions");
if(_coordConditions==null) {
- _coordConditions = new HashMap();
+ _coordConditions = new HashMap<>();
for(String batchGroup: _coordSpecs.keySet()) {
CoordSpec spec = _coordSpecs.get(batchGroup);
CoordCondition cond = new CoordCondition();
@@ -219,7 +218,7 @@ public class TridentBoltExecutor implements IRichBolt {
cond.expectedTaskReports+=context.getComponentTasks(comp).size();
}
}
- cond.targetTasks = new HashSet<Integer>();
+ cond.targetTasks = new HashSet<>();
for(String component: Utils.get(context.getThisTargets(),
COORD_STREAM(batchGroup),
new HashMap<String, Grouping>()).keySet()) {
@@ -399,7 +398,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = _bolt.getComponentConfiguration();
- if(ret==null) ret = new HashMap();
+ if(ret==null) ret = new HashMap<>();
ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
// TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
return ret;
[2/3] storm git commit: Merge branch 'generics_cleanup' of
https://github.com/ddebree/storm into STORM-991
Posted by ka...@apache.org.
Merge branch 'generics_cleanup' of https://github.com/ddebree/storm into STORM-991
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f86b3cd5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f86b3cd5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f86b3cd5
Branch: refs/heads/master
Commit: f86b3cd5827ebb7ae27b07a419bd01cd3d6b80e3
Parents: c559f42 9e4c3df
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 07:10:25 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 07:10:25 2015 +0900
----------------------------------------------------------------------
.../jvm/storm/trident/spout/ITridentSpout.java | 4 +--
.../OpaquePartitionedTridentSpoutExecutor.java | 20 +++++++--------
.../spout/PartitionedTridentSpoutExecutor.java | 26 ++++++++++----------
.../trident/spout/RichSpoutBatchExecutor.java | 4 +--
.../trident/spout/RichSpoutBatchTriggerer.java | 14 +++++------
.../trident/spout/TridentSpoutCoordinator.java | 6 ++---
.../trident/spout/TridentSpoutExecutor.java | 10 ++++----
.../trident/topology/TridentBoltExecutor.java | 15 ++++++-----
8 files changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f86b3cd5/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f86b3cd5/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 2019ea1,5266e1c..fa68ae2
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@@ -140,10 -139,10 +139,10 @@@ public class TridentBoltExecutor implem
}
}
- public class CoordinatedOutputCollector implements IOutputCollector {
+ private static class CoordinatedOutputCollector implements IOutputCollector {
IOutputCollector _delegate;
- TrackedBatch _currBatch = null;;
+ TrackedBatch _currBatch = null;
public void setCurrBatch(TrackedBatch batch) {
_currBatch = batch;
[3/3] storm git commit: add STORM-991 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-991 to CHANGELOG.md
* also add Dean de Bree to contributor list
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3ff465c5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3ff465c5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3ff465c5
Branch: refs/heads/master
Commit: 3ff465c5c0720f6c9701b2b7b807a94c92533817
Parents: f86b3cd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 07:19:29 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 07:19:29 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
README.markdown | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3ff465c5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4456564..e384de7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-991: General cleanup of the generics (storm.trident.spout package)
* STORM-1000: Use static member classes when permitted
* STORM-1003: In cluster.clj replace task-id with component-id in the declaration
* STORM-1013: [storm-elasticsearch] Expose TransportClient configuration Map to EsConfig
http://git-wip-us.apache.org/repos/asf/storm/blob/3ff465c5/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index 6521f4a..de02346 100644
--- a/README.markdown
+++ b/README.markdown
@@ -222,6 +222,7 @@ under the License.
* Sanket Reddy ([@redsanket](https://github.com/redsanket))
* Drew Robb ([@drewrobb](https://github.com/drewrobb))
* Frantz Mazoyer ([@fmazoyer](https://github.com/fmazoyer))
+* Dean de Bree ([@ddebree](https://github.com/ddebree))
## Acknowledgements