You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:57 UTC
[18/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java b/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java
deleted file mode 100644
index f7c227b..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package storm.trident.state.map;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.state.ReducerValueUpdater;
-import storm.trident.state.StateUpdater;
-import storm.trident.state.ValueUpdater;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class MapReducerAggStateUpdater implements StateUpdater<MapState> {
- ReducerAggregator _agg;
- Fields _groupFields;
- Fields _inputFields;
- ProjectionFactory _groupFactory;
- ProjectionFactory _inputFactory;
- ComboList.Factory _factory;
-
-
- public MapReducerAggStateUpdater(ReducerAggregator agg, Fields groupFields, Fields inputFields) {
- _agg = agg;
- _groupFields = groupFields;
- _inputFields = inputFields;
- _factory = new ComboList.Factory(groupFields.size(), 1);
- }
-
-
- @Override
- public void updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector) {
- Map<List<Object>, List<TridentTuple>> grouped = new HashMap();
-
- //List<List<Object>> groups = new ArrayList<List<Object>>(tuples.size());
- //List<Object> values = new ArrayList<Object>(tuples.size());
- for(TridentTuple t: tuples) {
- List<Object> group = _groupFactory.create(t);
- List<TridentTuple> groupTuples = grouped.get(group);
- if(groupTuples==null) {
- groupTuples = new ArrayList();
- grouped.put(group, groupTuples);
- }
- groupTuples.add(_inputFactory.create(t));
- }
- List<List<Object>> uniqueGroups = new ArrayList(grouped.keySet());
- List<ValueUpdater> updaters = new ArrayList(uniqueGroups.size());
- for(List<Object> group: uniqueGroups) {
- updaters.add(new ReducerValueUpdater(_agg, grouped.get(group)));
- }
- List<Object> results = map.multiUpdate(uniqueGroups, updaters);
-
- for(int i=0; i<uniqueGroups.size(); i++) {
- List<Object> group = uniqueGroups.get(i);
- Object result = results.get(i);
- collector.emit(_factory.create(new List[] {group, new Values(result) }));
- }
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- _groupFactory = context.makeProjectionFactory(_groupFields);
- _inputFactory = context.makeProjectionFactory(_inputFields);
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/MapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/MapState.java b/jstorm-client/src/main/java/storm/trident/state/map/MapState.java
deleted file mode 100644
index 78901d9..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/MapState.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.List;
-import storm.trident.state.ValueUpdater;
-
-public interface MapState<T> extends ReadOnlyMapState<T> {
- List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
- void multiPut(List<List<Object>> keys, List<T> vals);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java b/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java
deleted file mode 100644
index 2f356b1..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package storm.trident.state.map;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class MicroBatchIBackingMap<T> implements IBackingMap<T> {
- IBackingMap<T> _delegate;
- Options _options;
-
-
- public static class Options implements Serializable {
- public int maxMultiGetBatchSize = 0; // 0 means delegate batch size = trident batch size.
- public int maxMultiPutBatchSize = 0;
- }
-
- public MicroBatchIBackingMap(final Options options, final IBackingMap<T> delegate) {
- _options = options;
- _delegate = delegate;
- assert options.maxMultiPutBatchSize >= 0;
- assert options.maxMultiGetBatchSize >= 0;
- }
-
- @Override
- public void multiPut(final List<List<Object>> keys, final List<T> values) {
- int thisBatchSize;
- if(_options.maxMultiPutBatchSize == 0) { thisBatchSize = keys.size(); }
- else { thisBatchSize = _options.maxMultiPutBatchSize; }
-
- LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
- LinkedList<T> valuesTodo = new LinkedList<T>(values);
-
- while(!keysTodo.isEmpty()) {
- List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
- List<T> valuesBatch = new ArrayList<T>(thisBatchSize);
- for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
- keysBatch.add(keysTodo.removeFirst());
- valuesBatch.add(valuesTodo.removeFirst());
- }
-
- _delegate.multiPut(keysBatch, valuesBatch);
- }
- }
-
- @Override
- public List<T> multiGet(final List<List<Object>> keys) {
- int thisBatchSize;
- if(_options.maxMultiGetBatchSize == 0) { thisBatchSize = keys.size(); }
- else { thisBatchSize = _options.maxMultiGetBatchSize; }
-
- LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
-
- List<T> ret = new ArrayList<T>(keys.size());
-
- while(!keysTodo.isEmpty()) {
- List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
- for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
- keysBatch.add(keysTodo.removeFirst());
- }
-
- List<T> retSubset = _delegate.multiGet(keysBatch);
- ret.addAll(retSubset);
- }
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java b/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java
deleted file mode 100644
index 3a140b5..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package storm.trident.state.map;
-
-import storm.trident.state.ValueUpdater;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class NonTransactionalMap<T> implements MapState<T> {
- public static <T> MapState<T> build(IBackingMap<T> backing) {
- return new NonTransactionalMap<T>(backing);
- }
-
- IBackingMap<T> _backing;
-
- protected NonTransactionalMap(IBackingMap<T> backing) {
- _backing = backing;
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- return _backing.multiGet(keys);
- }
-
- @Override
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<T> curr = _backing.multiGet(keys);
- List<T> ret = new ArrayList<T>(curr.size());
- for(int i=0; i<curr.size(); i++) {
- T currVal = curr.get(i);
- ValueUpdater<T> updater = updaters.get(i);
- ret.add(updater.update(currVal));
- }
- _backing.multiPut(keys, ret);
- return ret;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- _backing.multiPut(keys, vals);
- }
-
- @Override
- public void beginCommit(Long txid) {
- }
-
- @Override
- public void commit(Long txid) {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java b/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java
deleted file mode 100644
index f646d66..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package storm.trident.state.map;
-
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.ValueUpdater;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class OpaqueMap<T> implements MapState<T> {
- public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {
- return new OpaqueMap<T>(backing);
- }
-
- CachedBatchReadsMap<OpaqueValue> _backing;
- Long _currTx;
-
- protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
- _backing = new CachedBatchReadsMap(backing);
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
- List<T> ret = new ArrayList<T>(curr.size());
- for(CachedBatchReadsMap.RetVal<OpaqueValue> retval: curr) {
- OpaqueValue val = retval.val;
- if(val!=null) {
- if(retval.cached) {
- ret.add((T) val.getCurr());
- } else {
- ret.add((T) val.get(_currTx));
- }
- } else {
- ret.add(null);
- }
- }
- return ret;
- }
-
- @Override
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
- List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size());
- List<T> ret = new ArrayList<T>();
- for(int i=0; i<curr.size(); i++) {
- CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i);
- OpaqueValue<T> val = retval.val;
- ValueUpdater<T> updater = updaters.get(i);
- T prev;
- if(val==null) {
- prev = null;
- } else {
- if(retval.cached) {
- prev = val.getCurr();
- } else {
- prev = val.get(_currTx);
- }
- }
- T newVal = updater.update(prev);
- ret.add(newVal);
- OpaqueValue<T> newOpaqueVal;
- if(val==null) {
- newOpaqueVal = new OpaqueValue<T>(_currTx, newVal);
- } else {
- newOpaqueVal = val.update(_currTx, newVal);
- }
- newVals.add(newOpaqueVal);
- }
- _backing.multiPut(keys, newVals);
- return ret;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size());
- for(T val: vals) {
- updaters.add(new ReplaceUpdater<T>(val));
- }
- multiUpdate(keys, updaters);
- }
-
- @Override
- public void beginCommit(Long txid) {
- _currTx = txid;
- _backing.reset();
- }
-
- @Override
- public void commit(Long txid) {
- _currTx = null;
- _backing.reset();
- }
-
- static class ReplaceUpdater<T> implements ValueUpdater<T> {
- T _t;
-
- public ReplaceUpdater(T t) {
- _t = t;
- }
-
- @Override
- public T update(Object stored) {
- return _t;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java b/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java
deleted file mode 100644
index 5a519c4..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.List;
-import storm.trident.state.State;
-
-public interface ReadOnlyMapState<T> extends State {
- // certain states might only accept one-tuple keys - those should just throw an error
- List<T> multiGet(List<List<Object>> keys);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java b/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java
deleted file mode 100644
index cf34f05..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.List;
-import storm.trident.state.State;
-
-public interface RemovableMapState<T> extends State {
- void multiRemove(List<List<Object>> keys);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java b/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java
deleted file mode 100644
index f42a5c9..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.Arrays;
-import java.util.List;
-import storm.trident.state.ValueUpdater;
-import storm.trident.state.snapshot.Snapshottable;
-
-
-public class SnapshottableMap<T> implements MapState<T>, Snapshottable<T> {
- MapState<T> _delegate;
- List<List<Object>> _keys;
-
- public SnapshottableMap(MapState<T> delegate, List<Object> snapshotKey) {
- _delegate = delegate;
- _keys = Arrays.asList(snapshotKey);
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- return _delegate.multiGet(keys);
- }
-
- @Override
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- return _delegate.multiUpdate(keys, updaters);
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- _delegate.multiPut(keys, vals);
- }
-
- @Override
- public void beginCommit(Long txid) {
- _delegate.beginCommit(txid);
- }
-
- @Override
- public void commit(Long txid) {
- _delegate.commit(txid);
- }
-
- @Override
- public T get() {
- return multiGet(_keys).get(0);
- }
-
- @Override
- public T update(ValueUpdater updater) {
- List<ValueUpdater> updaters = Arrays.asList(updater);
- return multiUpdate(_keys, updaters).get(0);
- }
-
- @Override
- public void set(T o) {
- multiPut(_keys, Arrays.asList(o));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java b/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java
deleted file mode 100644
index 1f44910..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package storm.trident.state.map;
-
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.ValueUpdater;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class TransactionalMap<T> implements MapState<T> {
- public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
- return new TransactionalMap<T>(backing);
- }
-
- CachedBatchReadsMap<TransactionalValue> _backing;
- Long _currTx;
-
- protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
- _backing = new CachedBatchReadsMap(backing);
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);
- List<T> ret = new ArrayList<T>(vals.size());
- for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {
- TransactionalValue v = retval.val;
- if(v!=null) {
- ret.add((T) v.getVal());
- } else {
- ret.add(null);
- }
- }
- return ret;
- }
-
- @Override
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
- List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
- List<List<Object>> newKeys = new ArrayList();
- List<T> ret = new ArrayList<T>();
- for(int i=0; i<curr.size(); i++) {
- CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
- TransactionalValue<T> val = retval.val;
- ValueUpdater<T> updater = updaters.get(i);
- TransactionalValue<T> newVal;
- boolean changed = false;
- if(val==null) {
- newVal = new TransactionalValue<T>(_currTx, updater.update(null));
- changed = true;
- } else {
- if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {
- newVal = val;
- } else {
- newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
- changed = true;
- }
- }
- ret.add(newVal.getVal());
- if(changed) {
- newVals.add(newVal);
- newKeys.add(keys.get(i));
- }
- }
- if(!newKeys.isEmpty()) {
- _backing.multiPut(newKeys, newVals);
- }
- return ret;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());
- for(T val: vals) {
- newVals.add(new TransactionalValue<T>(_currTx, val));
- }
- _backing.multiPut(keys, newVals);
- }
-
- @Override
- public void beginCommit(Long txid) {
- _currTx = txid;
- _backing.reset();
- }
-
- @Override
- public void commit(Long txid) {
- _currTx = null;
- _backing.reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java b/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java
deleted file mode 100644
index 2064a98..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.trident.state.snapshot;
-
-import storm.trident.state.State;
-
-public interface ReadOnlySnapshottable<T> extends State {
- T get();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java b/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java
deleted file mode 100644
index f216485..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package storm.trident.state.snapshot;
-
-import storm.trident.state.ValueUpdater;
-
-
-// used by Stream#persistentAggregate
-public interface Snapshottable<T> extends ReadOnlySnapshottable<T> {
- T update(ValueUpdater updater);
- void set(T o);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java b/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java
deleted file mode 100644
index 52f482f..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.BaseAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-
-public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {
-
- static class State {
- long count = 0;
- }
-
- @Override
- public State init(Object batchId, TridentCollector collector) {
- return new State();
- }
-
- @Override
- public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
- state.count++;
- }
-
- @Override
- public void complete(State state, TridentCollector collector) {
- collector.emit(new Values(state.count));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java b/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java
deleted file mode 100644
index 5571153..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.RegisteredGlobalState;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.topology.TransactionAttempt;
-import storm.trident.topology.TridentTopologyBuilder;
-
-public class FeederBatchSpout implements ITridentSpout, IFeeder {
-
- String _id;
- String _semaphoreId;
- Fields _outFields;
- boolean _waitToEmit = true;
-
-
- public FeederBatchSpout(List<String> fields) {
- _outFields = new Fields(fields);
- _id = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
- _semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
- }
-
- public void setWaitToEmit(boolean trueIfWait) {
- _waitToEmit = trueIfWait;
- }
-
- public void feed(Object tuples) {
- Semaphore sem = new Semaphore(0);
- ((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
- ((List)RegisteredGlobalState.getState(_id)).add(tuples);
- try {
- sem.acquire();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
-
- public class FeederCoordinator implements ITridentSpout.BatchCoordinator<Map<Integer, List<List<Object>>>> {
-
- int _numPartitions;
- int _emittedIndex = 0;
- Map<Long, Integer> txIndices = new HashMap();
-
- public FeederCoordinator(int numPartitions) {
- _numPartitions = numPartitions;
- }
-
- @Override
- public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
- if(currMetadata!=null) return currMetadata;
- List allBatches = (List) RegisteredGlobalState.getState(_id);
- if(allBatches.size()>_emittedIndex) {
- Object batchInfo = allBatches.get(_emittedIndex);
- txIndices.put(txid, _emittedIndex);
- _emittedIndex += 1;
- if(batchInfo instanceof Map) {
- return (Map) batchInfo;
- } else {
- List batchList = (List) batchInfo;
- Map<Integer, List<List<Object>>> partitions = new HashMap();
- for(int i=0; i<_numPartitions; i++) {
- partitions.put(i, new ArrayList());
- }
- for(int i=0; i<batchList.size(); i++) {
- int partition = i % _numPartitions;
- partitions.get(partition).add((List)batchList.get(i));
- }
- return partitions;
- }
- } else {
- return new HashMap();
- }
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void success(long txid) {
- Integer index = txIndices.get(txid);
- if(index != null) {
- Semaphore sem = (Semaphore) ((List)RegisteredGlobalState.getState(_semaphoreId)).get(index);
- sem.release();
- }
- }
-
- int _masterEmitted = 0;
-
- @Override
- public boolean isReady(long txid) {
- if(!_waitToEmit) return true;
- List allBatches = (List) RegisteredGlobalState.getState(_id);
- if(allBatches.size() > _masterEmitted) {
- _masterEmitted++;
- return true;
- } else {
- Utils.sleep(2);
- return false;
- }
- }
- }
-
- public class FeederEmitter implements ITridentSpout.Emitter<Map<Integer, List<List<Object>>>> {
-
- int _index;
-
- public FeederEmitter(int index) {
- _index = index;
- }
-
- @Override
- public void emitBatch(TransactionAttempt tx, Map<Integer, List<List<Object>>> coordinatorMeta, TridentCollector collector) {
- List<List<Object>> tuples = coordinatorMeta.get(_index);
- if(tuples!=null) {
- for(List<Object> t: tuples) {
- collector.emit(t);
- }
- }
- }
-
- @Override
- public void success(TransactionAttempt tx) {
- }
-
- @Override
- public void close() {
- }
- }
-
-
- @Override
- public Map getComponentConfiguration() {
- return null;
- }
-
- @Override
- public Fields getOutputFields() {
- return _outFields;
- }
-
- @Override
- public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
- int numTasks = context.getComponentTasks(
- TridentTopologyBuilder.spoutIdFromCoordinatorId(
- context.getThisComponentId()))
- .size();
- return new FeederCoordinator(numTasks);
- }
-
- @Override
- public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
- return new FeederEmitter(context.getThisTaskIndex());
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java b/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java
deleted file mode 100644
index d105c0c..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.ICommitterTridentSpout;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.topology.TransactionAttempt;
-
-
-public class FeederCommitterBatchSpout implements ICommitterTridentSpout, IFeeder {
-
- FeederBatchSpout _spout;
-
- public FeederCommitterBatchSpout(List<String> fields) {
- _spout = new FeederBatchSpout(fields);
- }
-
- public void setWaitToEmit(boolean trueIfWait) {
- _spout.setWaitToEmit(trueIfWait);
- }
-
- static class CommitterEmitter implements ICommitterTridentSpout.Emitter {
- ITridentSpout.Emitter _emitter;
-
-
- public CommitterEmitter(ITridentSpout.Emitter e) {
- _emitter = e;
- }
-
- @Override
- public void commit(TransactionAttempt attempt) {
- }
-
- @Override
- public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
- _emitter.emitBatch(tx, coordinatorMeta, collector);
- }
-
- @Override
- public void success(TransactionAttempt tx) {
- _emitter.success(tx);
- }
-
- @Override
- public void close() {
- _emitter.close();
- }
-
- }
-
- @Override
- public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
- return new CommitterEmitter(_spout.getEmitter(txStateId, conf, context));
- }
-
- @Override
- public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
- return _spout.getCoordinator(txStateId, conf, context);
- }
-
- @Override
- public Fields getOutputFields() {
- return _spout.getOutputFields();
- }
-
- @Override
- public Map getComponentConfiguration() {
- return _spout.getComponentConfiguration();
- }
-
- @Override
- public void feed(Object tuples) {
- _spout.feed(tuples);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java b/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java
deleted file mode 100644
index 6e32c1a..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.IBatchSpout;
-
-
-public class FixedBatchSpout implements IBatchSpout {
-
- Fields fields;
- List<Object>[] outputs;
- int maxBatchSize;
- HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
-
- public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
- this.fields = fields;
- this.outputs = outputs;
- this.maxBatchSize = maxBatchSize;
- }
-
- int index = 0;
- boolean cycle = false;
-
- public void setCycle(boolean cycle) {
- this.cycle = cycle;
- }
-
- @Override
- public void open(Map conf, TopologyContext context) {
- index = 0;
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- List<List<Object>> batch = this.batches.get(batchId);
- if(batch == null){
- batch = new ArrayList<List<Object>>();
- if(index>=outputs.length && cycle) {
- index = 0;
- }
- for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
- batch.add(outputs[index]);
- }
- this.batches.put(batchId, batch);
- }
- for(List<Object> list : batch){
- collector.emit(list);
- }
- }
-
- @Override
- public void ack(long batchId) {
- this.batches.remove(batchId);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Map getComponentConfiguration() {
- Config conf = new Config();
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
- @Override
- public Fields getOutputFields() {
- return fields;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java b/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java
deleted file mode 100644
index eaf02bb..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.testing;
-
-
-public interface IFeeder {
- void feed(Object tuples);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java b/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java
deleted file mode 100644
index 51c8ffb..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.IMetricsContext;
-import storm.trident.state.ITupleCollection;
-import backtype.storm.tuple.Values;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.ValueUpdater;
-import storm.trident.state.map.*;
-import storm.trident.state.snapshot.Snapshottable;
-import storm.trident.util.LRUMap;
-
-public class LRUMemoryMapState<T> implements Snapshottable<T>, ITupleCollection, MapState<T> {
-
- LRUMemoryMapStateBacking<OpaqueValue> _backing;
- SnapshottableMap<T> _delegate;
-
- public LRUMemoryMapState(int cacheSize, String id) {
- _backing = new LRUMemoryMapStateBacking(cacheSize, id);
- _delegate = new SnapshottableMap(OpaqueMap.build(_backing), new Values("$MEMORY-MAP-STATE-GLOBAL$"));
- }
-
- public T update(ValueUpdater updater) {
- return _delegate.update(updater);
- }
-
- public void set(T o) {
- _delegate.set(o);
- }
-
- public T get() {
- return _delegate.get();
- }
-
- public void beginCommit(Long txid) {
- _delegate.beginCommit(txid);
- }
-
- public void commit(Long txid) {
- _delegate.commit(txid);
- }
-
- public Iterator<List<Object>> getTuples() {
- return _backing.getTuples();
- }
-
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- return _delegate.multiUpdate(keys, updaters);
- }
-
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- _delegate.multiPut(keys, vals);
- }
-
- public List<T> multiGet(List<List<Object>> keys) {
- return _delegate.multiGet(keys);
- }
-
- public static class Factory implements StateFactory {
-
- String _id;
- int _maxSize;
-
- public Factory(int maxSize) {
- _id = UUID.randomUUID().toString();
- _maxSize = maxSize;
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- return new LRUMemoryMapState(_maxSize, _id + partitionIndex);
- }
- }
-
- static ConcurrentHashMap<String, Map<List<Object>, Object>> _dbs = new ConcurrentHashMap<String, Map<List<Object>, Object>>();
- static class LRUMemoryMapStateBacking<T> implements IBackingMap<T>, ITupleCollection {
-
- public static void clearAll() {
- _dbs.clear();
- }
- Map<List<Object>, T> db;
- Long currTx;
-
- public LRUMemoryMapStateBacking(int cacheSize, String id) {
- _dbs.putIfAbsent(id, new LRUMap<List<Object>, Object>(cacheSize));
- this.db = (Map<List<Object>, T>) _dbs.get(id);
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<T> ret = new ArrayList();
- for (List<Object> key : keys) {
- ret.add(db.get(key));
- }
- return ret;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- for (int i = 0; i < keys.size(); i++) {
- List<Object> key = keys.get(i);
- T val = vals.get(i);
- db.put(key, val);
- }
- }
-
- @Override
- public Iterator<List<Object>> getTuples() {
- return new Iterator<List<Object>>() {
-
- private Iterator<Map.Entry<List<Object>, T>> it = db.entrySet().iterator();
-
- public boolean hasNext() {
- return it.hasNext();
- }
-
- public List<Object> next() {
- Map.Entry<List<Object>, T> e = it.next();
- List<Object> ret = new ArrayList<Object>();
- ret.addAll(e.getKey());
- ret.add(((OpaqueValue)e.getValue()).getCurr());
- return ret;
- }
-
- public void remove() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java b/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java
deleted file mode 100644
index e222ba6..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package storm.trident.testing;
-
-import storm.trident.state.map.IBackingMap;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MemoryBackingMap implements IBackingMap<Object> {
- Map _vals = new HashMap();
-
- @Override
- public List<Object> multiGet(List<List<Object>> keys) {
- List ret = new ArrayList();
- for(List key: keys) {
- ret.add(_vals.get(key));
- }
- return ret;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<Object> vals) {
- for(int i=0; i<keys.size(); i++) {
- List key = keys.get(i);
- Object val = vals.get(i);
- _vals.put(key, val);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java b/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java
deleted file mode 100644
index 9512d63..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.IMetricsContext;
-import storm.trident.state.ITupleCollection;
-import backtype.storm.tuple.Values;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.ValueUpdater;
-import storm.trident.state.map.*;
-import storm.trident.state.snapshot.Snapshottable;
-
-public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, MapState<T>, RemovableMapState<T> {
-
- MemoryMapStateBacking<OpaqueValue> _backing;
- SnapshottableMap<T> _delegate;
- List<List<Object>> _removed = new ArrayList();
- Long _currTx = null;
-
-
- public MemoryMapState(String id) {
- _backing = new MemoryMapStateBacking(id);
- _delegate = new SnapshottableMap(OpaqueMap.build(_backing), new Values("$MEMORY-MAP-STATE-GLOBAL$"));
- }
-
- public T update(ValueUpdater updater) {
- return _delegate.update(updater);
- }
-
- public void set(T o) {
- _delegate.set(o);
- }
-
- public T get() {
- return _delegate.get();
- }
-
- public void beginCommit(Long txid) {
- _delegate.beginCommit(txid);
- if(txid==null || !txid.equals(_currTx)) {
- _backing.multiRemove(_removed);
- }
- _removed = new ArrayList();
- _currTx = txid;
- }
-
- public void commit(Long txid) {
- _delegate.commit(txid);
- }
-
- public Iterator<List<Object>> getTuples() {
- return _backing.getTuples();
- }
-
- public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
- return _delegate.multiUpdate(keys, updaters);
- }
-
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- _delegate.multiPut(keys, vals);
- }
-
- public List<T> multiGet(List<List<Object>> keys) {
- return _delegate.multiGet(keys);
- }
-
- @Override
- public void multiRemove(List<List<Object>> keys) {
- List nulls = new ArrayList();
- for(int i=0; i<keys.size(); i++) {
- nulls.add(null);
- }
- // first just set the keys to null, then flag to remove them at beginning of next commit when we know the current and last value are both null
- multiPut(keys, nulls);
- _removed.addAll(keys);
- }
-
- public static class Factory implements StateFactory {
-
- String _id;
-
- public Factory() {
- _id = UUID.randomUUID().toString();
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- return new MemoryMapState(_id + partitionIndex);
- }
- }
-
- static ConcurrentHashMap<String, Map<List<Object>, Object>> _dbs = new ConcurrentHashMap<String, Map<List<Object>, Object>>();
- static class MemoryMapStateBacking<T> implements IBackingMap<T>, ITupleCollection {
-
- public static void clearAll() {
- _dbs.clear();
- }
- Map<List<Object>, T> db;
- Long currTx;
-
- public MemoryMapStateBacking(String id) {
- _dbs.putIfAbsent(id, new HashMap());
- this.db = (Map<List<Object>, T>) _dbs.get(id);
- }
-
- public void multiRemove(List<List<Object>> keys) {
- for(List<Object> key: keys) {
- db.remove(key);
- }
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<T> ret = new ArrayList();
- for (List<Object> key : keys) {
- ret.add(db.get(key));
- }
- return ret;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- for (int i = 0; i < keys.size(); i++) {
- List<Object> key = keys.get(i);
- T val = vals.get(i);
- db.put(key, val);
- }
- }
-
- @Override
- public Iterator<List<Object>> getTuples() {
- return new Iterator<List<Object>>() {
-
- private Iterator<Map.Entry<List<Object>, T>> it = db.entrySet().iterator();
-
- public boolean hasNext() {
- return it.hasNext();
- }
-
- public List<Object> next() {
- Map.Entry<List<Object>, T> e = it.next();
- List<Object> ret = new ArrayList<Object>();
- ret.addAll(e.getKey());
- ret.add(((OpaqueValue)e.getValue()).getCurr());
- return ret;
- }
-
- public void remove() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/Split.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/Split.java b/jstorm-client/src/main/java/storm/trident/testing/Split.java
deleted file mode 100644
index 65cdb8b..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/Split.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-public class Split extends BaseFunction {
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- for(String word: tuple.getString(0).split(" ")) {
- if(word.length() > 0) {
- collector.emit(new Values(word));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/StringLength.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/StringLength.java b/jstorm-client/src/main/java/storm/trident/testing/StringLength.java
deleted file mode 100644
index f99a5c7..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/StringLength.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-public class StringLength extends BaseFunction {
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- collector.emit(new Values(tuple.getString(0).length()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java b/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java
deleted file mode 100644
index 6912063..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package storm.trident.testing;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-public class TrueFilter extends BaseFilter {
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java b/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java
deleted file mode 100644
index 764e51e..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package storm.trident.testing;
-
-import java.util.List;
-
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-import backtype.storm.utils.Utils;
-
-public class TuplifyArgs extends BaseFunction {
-
- @Override
- public void execute(TridentTuple input, TridentCollector collector) {
- String args = input.getString(0);
- List<List<Object>> tuples = (List) Utils.from_json(args);
- for(List<Object> tuple: tuples) {
- collector.emit(tuple);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java b/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java
deleted file mode 100644
index a3e3076..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package storm.trident.topology;
-
-import storm.trident.spout.IBatchID;
-
-
-public class BatchInfo {
- public IBatchID batchId;
- public Object state;
- public String batchGroup;
-
- public BatchInfo(String batchGroup, IBatchID batchId, Object state) {
- this.batchGroup = batchGroup;
- this.batchId = batchId;
- this.state = state;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java b/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java
deleted file mode 100644
index b6f60ce..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-
-public interface ITridentBatchBolt extends IComponent {
- void prepare(Map conf, TopologyContext context, BatchOutputCollector collector);
- void execute(BatchInfo batchInfo, Tuple tuple);
- void finishBatch(BatchInfo batchInfo);
- Object initBatchState(String batchGroup, Object batchId);
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java b/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java
deleted file mode 100644
index 201696e..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.WindowedTimeThrottler;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.spout.ICommitterTridentSpout;
-import storm.trident.topology.state.TransactionalState;
-
-public class MasterBatchCoordinator extends BaseRichSpout {
- public static final Logger LOG = Logger.getLogger(MasterBatchCoordinator.class);
-
- public static final long INIT_TXID = 1L;
-
-
- public static final String BATCH_STREAM_ID = "$batch";
- public static final String COMMIT_STREAM_ID = "$commit";
- public static final String SUCCESS_STREAM_ID = "$success";
-
- private static final String CURRENT_TX = "currtx";
- private static final String CURRENT_ATTEMPTS = "currattempts";
-
- private static enum Operation {
- ACK,
- FAIL,
- NEXTTUPLE
- }
-
- private List<TransactionalState> _states = new ArrayList();
-
- TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
- TreeMap<Long, Integer> _attemptIds;
-
- private SpoutOutputCollector _collector;
- Long _currTransaction;
- int _maxTransactionActive;
-
- List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
-
-
- List<String> _managedSpoutIds;
- List<ITridentSpout> _spouts;
- WindowedTimeThrottler _throttler;
-
- boolean _active = true;
-
- AtomicBoolean failedOccur = new AtomicBoolean(false);
-
- public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
- if(spoutIds.isEmpty()) {
- throw new IllegalArgumentException("Must manage at least one spout");
- }
- _managedSpoutIds = spoutIds;
- _spouts = spouts;
- }
-
- public List<String> getManagedSpoutIds(){
- return _managedSpoutIds;
- }
-
- @Override
- public void activate() {
- _active = true;
- }
-
- @Override
- public void deactivate() {
- _active = false;
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
- for(String spoutId: _managedSpoutIds) {
- _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
- }
- _currTransaction = getStoredCurrTransaction();
-
- _collector = collector;
- Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if(active==null) {
- _maxTransactionActive = 1;
- } else {
- _maxTransactionActive = active.intValue();
- }
- _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
-
-
- for(int i=0; i<_spouts.size(); i++) {
- String txId = _managedSpoutIds.get(i);
- _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
- }
- }
-
- @Override
- public void close() {
- for(TransactionalState state: _states) {
- state.close();
- }
- }
-
- @Override
- public void nextTuple() {
- sync(Operation.NEXTTUPLE, null);
- }
-
- @Override
- public void ack(Object msgId) {
- sync(Operation.ACK, (TransactionAttempt) msgId);
- }
-
- @Override
- public void fail(Object msgId) {
- sync(Operation.FAIL, (TransactionAttempt) msgId);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
- // when it sees the earlier txid it should know to emit nothing
- declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
- declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
- declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
- }
-
- synchronized private void sync(Operation op, TransactionAttempt attempt) {
- TransactionStatus status;
- long txid;
-
- switch (op) {
- case FAIL:
- // Remove the failed one and the items whose id is higher than the failed one.
- // Then those ones will be retried when nextTuple.
- txid = attempt.getTransactionId();
- status = _activeTx.remove(txid);
- if(status!=null && status.attempt.equals(attempt)) {
- _activeTx.tailMap(txid).clear();
- }
- break;
-
- case ACK:
- txid = attempt.getTransactionId();
- status = _activeTx.get(txid);
- if(status!=null && attempt.equals(status.attempt)) {
- if(status.status==AttemptStatus.PROCESSING ) {
- status.status = AttemptStatus.PROCESSED;
- } else if(status.status==AttemptStatus.COMMITTING) {
- status.status = AttemptStatus.COMMITTED;
- }
- }
- break;
-
- case NEXTTUPLE:
- // note that sometimes the tuples active may be less than max_spout_pending, e.g.
- // max_spout_pending = 3
- // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
- // and there won't be a batch for tx 4 because there's max_spout_pending tx active
- status = _activeTx.get(_currTransaction);
- if (status!=null) {
- if(status.status == AttemptStatus.PROCESSED) {
- status.status = AttemptStatus.COMMITTING;
- _collector.emit(COMMIT_STREAM_ID, new Values(status.attempt), status.attempt);
- } else if (status.status == AttemptStatus.COMMITTED) {
- _activeTx.remove(status.attempt.getTransactionId());
- _attemptIds.remove(status.attempt.getTransactionId());
- _collector.emit(SUCCESS_STREAM_ID, new Values(status.attempt));
- _currTransaction = nextTransactionId(status.attempt.getTransactionId());
- for(TransactionalState state: _states) {
- state.setData(CURRENT_TX, _currTransaction);
- }
- }
- }
-
- if(_active) {
- if(_activeTx.size() < _maxTransactionActive) {
- Long curr = _currTransaction;
- for(int i=0; i<_maxTransactionActive; i++) {
- if(batchDelay()) {
- break;
- }
-
- if(isReady(curr)) {
- if(!_activeTx.containsKey(curr)) {
- // by using a monotonically increasing attempt id, downstream tasks
- // can be memory efficient by clearing out state for old attempts
- // as soon as they see a higher attempt id for a transaction
- Integer attemptId = _attemptIds.get(curr);
- if(attemptId==null) {
- attemptId = 0;
- } else {
- attemptId++;
- }
- _attemptIds.put(curr, attemptId);
- for(TransactionalState state: _states) {
- state.setData(CURRENT_ATTEMPTS, _attemptIds);
- }
-
- TransactionAttempt currAttempt = new TransactionAttempt(curr, attemptId);
- _activeTx.put(curr, new TransactionStatus(currAttempt));
- _collector.emit(BATCH_STREAM_ID, new Values(currAttempt), currAttempt);
- _throttler.markEvent();
- break;
- }
- }
- curr = nextTransactionId(curr);
- }
- } else {
- // Do nothing
- }
- }
- break;
-
- default:
- LOG.warn("Unknow Operation code=" + op);
- break;
- }
- }
-
- private boolean isReady(long txid) {
- //TODO: make this strategy configurable?... right now it goes if anyone is ready
- for(ITridentSpout.BatchCoordinator coord: _coordinators) {
- if(coord.isReady(txid)) return true;
- }
- return false;
- }
-
- private boolean batchDelay() {
- return _throttler.isThrottled();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config ret = new Config();
- ret.setMaxTaskParallelism(1);
- ret.registerSerialization(TransactionAttempt.class);
- return ret;
- }
-
- private static enum AttemptStatus {
- PROCESSING,
- PROCESSED,
- COMMITTING,
- COMMITTED
- }
-
- private static class TransactionStatus {
- TransactionAttempt attempt;
- AttemptStatus status;
-
- public TransactionStatus(TransactionAttempt attempt) {
- this.attempt = attempt;
- this.status = AttemptStatus.PROCESSING;
- }
-
- @Override
- public String toString() {
- return attempt.toString() + " <" + status.toString() + ">";
- }
- }
-
-
- private Long nextTransactionId(Long id) {
- return id + 1;
- }
-
- private Long getStoredCurrTransaction() {
- Long ret = INIT_TXID;
- for(TransactionalState state: _states) {
- Long curr = (Long) state.getData(CURRENT_TX);
- if(curr!=null && curr.compareTo(ret) > 0) {
- ret = curr;
- }
- }
- return ret;
- }
-
- private TreeMap<Long, Integer> getStoredCurrAttempts(long currTransaction, int maxBatches) {
- TreeMap<Long, Integer> ret = new TreeMap<Long, Integer>();
- for(TransactionalState state: _states) {
- Map<Object, Number> attempts = (Map) state.getData(CURRENT_ATTEMPTS);
- if(attempts==null) attempts = new HashMap();
- for(Entry<Object, Number> e: attempts.entrySet()) {
- // this is because json doesn't allow numbers as keys...
- // TODO: replace json with a better form of encoding
- Number txidObj;
- if(e.getKey() instanceof String) {
- txidObj = Long.parseLong((String) e.getKey());
- } else {
- txidObj = (Number) e.getKey();
- }
- long txid = ((Number) txidObj).longValue();
- int attemptId = ((Number) e.getValue()).intValue();
- Integer curr = ret.get(txid);
- if(curr==null || attemptId > curr) {
- ret.put(txid, attemptId);
- }
- }
- }
- ret.headMap(currTransaction).clear();
- ret.tailMap(currTransaction + maxBatches - 1).clear();
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java b/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java
deleted file mode 100644
index b2ea328..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package storm.trident.topology;
-
-import storm.trident.spout.IBatchID;
-
-
-public class TransactionAttempt implements IBatchID {
- Long _txid;
- int _attemptId;
-
-
- // for kryo compatibility
- public TransactionAttempt() {
-
- }
-
- public TransactionAttempt(Long txid, int attemptId) {
- _txid = txid;
- _attemptId = attemptId;
- }
-
- public Long getTransactionId() {
- return _txid;
- }
-
- public Object getId() {
- return _txid;
- }
-
- public int getAttemptId() {
- return _attemptId;
- }
-
- @Override
- public int hashCode() {
- return _txid.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if(!(o instanceof TransactionAttempt)) return false;
- TransactionAttempt other = (TransactionAttempt) o;
- return _txid.equals(other._txid) && _attemptId == other._attemptId;
- }
-
- @Override
- public String toString() {
- return "" + _txid + ":" + _attemptId;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java b/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java
deleted file mode 100644
index 71807bb..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java
+++ /dev/null
@@ -1,430 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.coordination.BatchOutputCollectorImpl;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.RotatingMap;
-import backtype.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-
-import storm.trident.spout.IBatchID;
-
-public class TridentBoltExecutor implements IRichBolt {
- public static String COORD_STREAM_PREFIX = "$coord-";
-
- public static String COORD_STREAM(String batch) {
- return COORD_STREAM_PREFIX + batch;
- }
-
- public static class CoordType implements Serializable {
- public boolean singleCount;
-
- protected CoordType(boolean singleCount) {
- this.singleCount = singleCount;
- }
-
- public static CoordType single() {
- return new CoordType(true);
- }
-
- public static CoordType all() {
- return new CoordType(false);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (singleCount ? 1231 : 1237);
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- CoordType other = (CoordType) obj;
- if (singleCount != other.singleCount)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "<Single: " + singleCount + ">";
- }
-
-
- }
-
- public static class CoordSpec implements Serializable {
- public GlobalStreamId commitStream = null;
- public Map<String, CoordType> coords = new HashMap<String, CoordType>();
-
- public CoordSpec() {
- }
- }
-
- public static class CoordCondition implements Serializable {
- public GlobalStreamId commitStream;
- public int expectedTaskReports;
- Set<Integer> targetTasks;
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
- }
- }
-
- Map<GlobalStreamId, String> _batchGroupIds;
- Map<String, CoordSpec> _coordSpecs;
- Map<String, CoordCondition> _coordConditions;
- ITridentBatchBolt _bolt;
- long _messageTimeoutMs;
- long _lastRotate;
-
- RotatingMap _batches;
-
- // map from batchgroupid to coordspec
- public TridentBoltExecutor(ITridentBatchBolt bolt, Map<GlobalStreamId, String> batchGroupIds, Map<String, CoordSpec> coordinationSpecs) {
- _batchGroupIds = batchGroupIds;
- _coordSpecs = coordinationSpecs;
- _bolt = bolt;
- }
-
- public static class TrackedBatch {
- int attemptId;
- BatchInfo info;
- CoordCondition condition;
- int reportedTasks = 0;
- int expectedTupleCount = 0;
- int receivedTuples = 0;
- Map<Integer, Integer> taskEmittedTuples = new HashMap();
- boolean failed = false;
- boolean receivedCommit;
- Tuple delayedAck = null;
-
- public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) {
- this.info = info;
- this.condition = condition;
- this.attemptId = attemptId;
- receivedCommit = condition.commitStream == null;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
- }
- }
-
- public class CoordinatedOutputCollector implements IOutputCollector {
- IOutputCollector _delegate;
-
- TrackedBatch _currBatch = null;;
-
- public void setCurrBatch(TrackedBatch batch) {
- _currBatch = batch;
- }
-
- public CoordinatedOutputCollector(IOutputCollector delegate) {
- _delegate = delegate;
- }
-
- public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
- List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
- updateTaskCounts(tasks);
- return tasks;
- }
-
- public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
- updateTaskCounts(Arrays.asList(task));
- _delegate.emitDirect(task, stream, anchors, tuple);
- }
-
- public void ack(Tuple tuple) {
- throw new IllegalStateException("Method should never be called");
- }
-
- public void fail(Tuple tuple) {
- throw new IllegalStateException("Method should never be called");
- }
-
- public void reportError(Throwable error) {
- _delegate.reportError(error);
- }
-
-
- private void updateTaskCounts(List<Integer> tasks) {
- if(_currBatch!=null) {
- Map<Integer, Integer> taskEmittedTuples = _currBatch.taskEmittedTuples;
- for(Integer task: tasks) {
- int newCount = Utils.get(taskEmittedTuples, task, 0) + 1;
- taskEmittedTuples.put(task, newCount);
- }
- }
- }
- }
-
- OutputCollector _collector;
- CoordinatedOutputCollector _coordCollector;
- BatchOutputCollector _coordOutputCollector;
- TopologyContext _context;
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
- _lastRotate = System.currentTimeMillis();
- _batches = new RotatingMap(2);
- _context = context;
- _collector = collector;
- _coordCollector = new CoordinatedOutputCollector(collector);
- _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));
-
- _coordConditions = (Map) context.getExecutorData("__coordConditions");
- if(_coordConditions==null) {
- _coordConditions = new HashMap();
- for(String batchGroup: _coordSpecs.keySet()) {
- CoordSpec spec = _coordSpecs.get(batchGroup);
- CoordCondition cond = new CoordCondition();
- cond.commitStream = spec.commitStream;
- cond.expectedTaskReports = 0;
- for(String comp: spec.coords.keySet()) {
- CoordType ct = spec.coords.get(comp);
- if(ct.equals(CoordType.single())) {
- cond.expectedTaskReports+=1;
- } else {
- cond.expectedTaskReports+=context.getComponentTasks(comp).size();
- }
- }
- cond.targetTasks = new HashSet<Integer>();
- for(String component: Utils.get(context.getThisTargets(),
- COORD_STREAM(batchGroup),
- new HashMap<String, Grouping>()).keySet()) {
- cond.targetTasks.addAll(context.getComponentTasks(component));
- }
- _coordConditions.put(batchGroup, cond);
- }
- context.setExecutorData("_coordConditions", _coordConditions);
- }
- _bolt.prepare(conf, context, _coordOutputCollector);
- }
-
- private void failBatch(TrackedBatch tracked, FailedException e) {
- if(e!=null && e instanceof ReportedFailedException) {
- _collector.reportError(e);
- }
- tracked.failed = true;
- if(tracked.delayedAck!=null) {
- _collector.fail(tracked.delayedAck);
- tracked.delayedAck = null;
- }
- }
-
- private void failBatch(TrackedBatch tracked) {
- failBatch(tracked, null);
- }
-
- private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
- boolean success = true;
- try {
- _bolt.finishBatch(tracked.info);
- String stream = COORD_STREAM(tracked.info.batchGroup);
- for(Integer task: tracked.condition.targetTasks) {
- _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
- }
- if(tracked.delayedAck!=null) {
- _collector.ack(tracked.delayedAck);
- tracked.delayedAck = null;
- }
- } catch(FailedException e) {
- failBatch(tracked, e);
- success = false;
- }
- _batches.remove(tracked.info.batchId.getId());
- return success;
- }
-
- private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
- if(tracked.failed) {
- failBatch(tracked);
- _collector.fail(tuple);
- return;
- }
- CoordCondition cond = tracked.condition;
- boolean delayed = tracked.delayedAck==null &&
- (cond.commitStream!=null && type==TupleType.COMMIT
- || cond.commitStream==null);
- if(delayed) {
- tracked.delayedAck = tuple;
- }
- boolean failed = false;
- if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
- if(tracked.receivedTuples == tracked.expectedTupleCount) {
- finishBatch(tracked, tuple);
- } else {
- //TODO: add logging that not all tuples were received
- failBatch(tracked);
- _collector.fail(tuple);
- failed = true;
- }
- }
-
- if(!delayed && !failed) {
- _collector.ack(tuple);
- }
-
- }
-
- @Override
- public void execute(Tuple tuple) {
- if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
- long now = System.currentTimeMillis();
- if(now - _lastRotate > _messageTimeoutMs) {
- _batches.rotate();
- _lastRotate = now;
- }
- return;
- }
- String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamid());
- if(batchGroup==null) {
- // this is so we can do things like have simple DRPC that doesn't need to use batch processing
- _coordCollector.setCurrBatch(null);
- _bolt.execute(null, tuple);
- _collector.ack(tuple);
- return;
- }
- IBatchID id = (IBatchID) tuple.getValue(0);
- //get transaction id
- //if it already exissts and attempt id is greater than the attempt there
-
-
- TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
-// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
-// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
-// + " (" + _batches.size() + ")" +
-// "\ntuple: " + tuple +
-// "\nwith tracked " + tracked +
-// "\nwith id " + id +
-// "\nwith group " + batchGroup
-// + "\n");
-//
-// }
- //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
-
- // this code here ensures that only one attempt is ever tracked for a batch, so when
- // failures happen you don't get an explosion in memory usage in the tasks
- if(tracked!=null) {
- if(id.getAttemptId() > tracked.attemptId) {
- _batches.remove(id.getId());
- tracked = null;
- } else if(id.getAttemptId() < tracked.attemptId) {
- // no reason to try to execute a previous attempt than we've already seen
- return;
- }
- }
-
- if(tracked==null) {
- tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
- _batches.put(id.getId(), tracked);
- }
- _coordCollector.setCurrBatch(tracked);
-
- //System.out.println("TRACKED: " + tracked + " " + tuple);
-
- TupleType t = getTupleType(tuple, tracked);
- if(t==TupleType.COMMIT) {
- tracked.receivedCommit = true;
- checkFinish(tracked, tuple, t);
- } else if(t==TupleType.COORD) {
- int count = tuple.getInteger(1);
- tracked.reportedTasks++;
- tracked.expectedTupleCount+=count;
- checkFinish(tracked, tuple, t);
- } else {
- tracked.receivedTuples++;
- boolean success = true;
- try {
- _bolt.execute(tracked.info, tuple);
- if(tracked.condition.expectedTaskReports==0) {
- success = finishBatch(tracked, tuple);
- }
- } catch(FailedException e) {
- failBatch(tracked, e);
- }
- if(success) {
- _collector.ack(tuple);
- } else {
- _collector.fail(tuple);
- }
- }
- _coordCollector.setCurrBatch(null);
- }
-
- @Override
- public void cleanup() {
- _bolt.cleanup();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _bolt.declareOutputFields(declarer);
- for(String batchGroup: _coordSpecs.keySet()) {
- declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count"));
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> ret = _bolt.getComponentConfiguration();
- if(ret==null) ret = new HashMap();
- ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
- // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
- return ret;
- }
-
- private TupleType getTupleType(Tuple tuple, TrackedBatch batch) {
- CoordCondition cond = batch.condition;
- if(cond.commitStream!=null
- && tuple.getSourceGlobalStreamid().equals(cond.commitStream)) {
- return TupleType.COMMIT;
- } else if(cond.expectedTaskReports > 0
- && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) {
- return TupleType.COORD;
- } else {
- return TupleType.REGULAR;
- }
- }
-
- static enum TupleType {
- REGULAR,
- COMMIT,
- COORD
- }
-}