You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:59 UTC
[20/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java
deleted file mode 100644
index 17e12ee..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package storm.trident.operation.builtin;
-
-import backtype.storm.tuple.Values;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.map.ReadOnlyMapState;
-import storm.trident.tuple.TridentTuple;
-
-
-public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
- @Override
- public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
- return map.multiGet((List) keys);
- }
-
- @Override
- public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
- collector.emit(new Values(result));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java
deleted file mode 100644
index 7a48477..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package storm.trident.operation.builtin;
-
-import java.util.Map;
-import storm.trident.operation.Filter;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class Negate implements Filter {
-
- Filter _delegate;
-
- public Negate(Filter delegate) {
- _delegate = delegate;
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- return !_delegate.isKeep(tuple);
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- _delegate.prepare(conf, context);
- }
-
- @Override
- public void cleanup() {
- _delegate.cleanup();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java
deleted file mode 100644
index fbc3286..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package storm.trident.operation.builtin;
-
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.snapshot.ReadOnlySnapshottable;
-import storm.trident.tuple.TridentTuple;
-
-public class SnapshotGet extends BaseQueryFunction<ReadOnlySnapshottable, Object> {
-
- @Override
- public List<Object> batchRetrieve(ReadOnlySnapshottable state, List<TridentTuple> args) {
- List<Object> ret = new ArrayList<Object>(args.size());
- Object snapshot = state.get();
- for(int i=0; i<args.size(); i++) {
- ret.add(snapshot);
- }
- return ret;
- }
-
- @Override
- public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
- collector.emit(new Values(result));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java
deleted file mode 100644
index d67ac66..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package storm.trident.operation.builtin;
-
-import clojure.lang.Numbers;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.tuple.TridentTuple;
-
-
-public class Sum implements CombinerAggregator<Number> {
-
- @Override
- public Number init(TridentTuple tuple) {
- return (Number) tuple.getValue(0);
- }
-
- @Override
- public Number combine(Number val1, Number val2) {
- return Numbers.add(val1, val2);
- }
-
- @Override
- public Number zero() {
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java
deleted file mode 100644
index 6302e02..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.state.ITupleCollection;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
-
-public class TupleCollectionGet extends BaseQueryFunction<State, Iterator<List<Object>>> {
-
- @Override
- public List<Iterator<List<Object>>> batchRetrieve(State state, List<TridentTuple> args) {
- List<Iterator<List<Object>>> ret = new ArrayList(args.size());
- for(int i=0; i<args.size(); i++) {
- ret.add(((ITupleCollection)state).getTuples());
- }
- return ret;
- }
-
- @Override
- public void execute(TridentTuple tuple, Iterator<List<Object>> tuplesIterator, TridentCollector collector) {
- while(tuplesIterator.hasNext()) {
- collector.emit(tuplesIterator.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java
deleted file mode 100644
index 9fe4419..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-
-public class CaptureCollector implements TridentCollector {
- public List<List<Object>> captured = new ArrayList();
-
- TridentCollector _coll;
-
- public void setCollector(TridentCollector coll) {
- _coll = coll;
- }
-
- @Override
- public void emit(List<Object> values) {
- this.captured.add(values);
- }
-
- @Override
- public void reportError(Throwable t) {
- _coll.reportError(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java
deleted file mode 100644
index f8bd001..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
- Aggregator[] _aggs;
- ProjectionFactory[] _inputFactories;
- ComboList.Factory _fact;
- Fields[] _inputFields;
-
-
-
- public ChainedAggregatorImpl(Aggregator[] aggs, Fields[] inputFields, ComboList.Factory fact) {
- _aggs = aggs;
- _inputFields = inputFields;
- _fact = fact;
- if(_aggs.length!=_inputFields.length) {
- throw new IllegalArgumentException("Require input fields for each aggregator");
- }
- }
-
- public void prepare(Map conf, TridentOperationContext context) {
- _inputFactories = new ProjectionFactory[_inputFields.length];
- for(int i=0; i<_inputFields.length; i++) {
- _inputFactories[i] = context.makeProjectionFactory(_inputFields[i]);
- _aggs[i].prepare(conf, new TridentOperationContext(context, _inputFactories[i]));
- }
- }
-
- public ChainedResult init(Object batchId, TridentCollector collector) {
- ChainedResult initted = new ChainedResult(collector, _aggs.length);
- for(int i=0; i<_aggs.length; i++) {
- initted.objs[i] = _aggs[i].init(batchId, initted.collectors[i]);
- }
- return initted;
- }
-
- public void aggregate(ChainedResult val, TridentTuple tuple, TridentCollector collector) {
- val.setFollowThroughCollector(collector);
- for(int i=0; i<_aggs.length; i++) {
- TridentTuple projected = _inputFactories[i].create((TridentTupleView) tuple);
- _aggs[i].aggregate(val.objs[i], projected, val.collectors[i]);
- }
- }
-
- public void complete(ChainedResult val, TridentCollector collector) {
- val.setFollowThroughCollector(collector);
- for(int i=0; i<_aggs.length; i++) {
- _aggs[i].complete(val.objs[i], val.collectors[i]);
- }
- if(_aggs.length > 1) { // otherwise, tuples were emitted directly
- int[] indices = new int[val.collectors.length];
- for(int i=0; i<indices.length; i++) {
- indices[i] = 0;
- }
- boolean keepGoing = true;
- //emit cross-join of all emitted tuples
- while(keepGoing) {
- List[] combined = new List[_aggs.length];
- for(int i=0; i< _aggs.length; i++) {
- CaptureCollector capturer = (CaptureCollector) val.collectors[i];
- combined[i] = capturer.captured.get(indices[i]);
- }
- collector.emit(_fact.create(combined));
- keepGoing = increment(val.collectors, indices, indices.length - 1);
- }
- }
- }
-
- //return false if can't increment anymore
- private boolean increment(TridentCollector[] lengths, int[] indices, int j) {
- if(j==-1) return false;
- indices[j]++;
- CaptureCollector capturer = (CaptureCollector) lengths[j];
- if(indices[j] >= capturer.captured.size()) {
- indices[j] = 0;
- return increment(lengths, indices, j-1);
- }
- return true;
- }
-
- public void cleanup() {
- for(Aggregator a: _aggs) {
- a.cleanup();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java
deleted file mode 100644
index a35df3a..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.operation.impl;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import storm.trident.operation.TridentCollector;
-
-
-//for ChainedAggregator
-public class ChainedResult {
- Object[] objs;
- TridentCollector[] collectors;
-
- public ChainedResult(TridentCollector collector, int size) {
- objs = new Object[size];
- collectors = new TridentCollector[size];
- for(int i=0; i<size; i++) {
- if(size==1) {
- collectors[i] = collector;
- } else {
- collectors[i] = new CaptureCollector();
- }
- }
- }
-
- public void setFollowThroughCollector(TridentCollector collector) {
- if(collectors.length>1) {
- for(TridentCollector c: collectors) {
- ((CaptureCollector) c).setCollector(collector);
- }
- }
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(objs);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java
deleted file mode 100644
index 97a9b9d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.state.CombinerValueUpdater;
-import storm.trident.state.StateUpdater;
-import storm.trident.state.snapshot.Snapshottable;
-import storm.trident.tuple.TridentTuple;
-
-public class CombinerAggStateUpdater implements StateUpdater<Snapshottable> {
- CombinerAggregator _agg;
-
- public CombinerAggStateUpdater(CombinerAggregator agg) {
- _agg = agg;
- }
-
-
- @Override
- public void updateState(Snapshottable state, List<TridentTuple> tuples, TridentCollector collector) {
- if(tuples.size()!=1) {
- throw new IllegalArgumentException("Combiner state updater should receive a single tuple. Received: " + tuples.toString());
- }
- Object newVal = state.update(new CombinerValueUpdater(_agg, tuples.get(0).getValue(0)));
- collector.emit(new Values(newVal));
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
deleted file mode 100644
index d9d00e5..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class CombinerAggregatorCombineImpl implements Aggregator<Result> {
- CombinerAggregator _agg;
-
- public CombinerAggregatorCombineImpl(CombinerAggregator agg) {
- _agg = agg;
- }
-
- public void prepare(Map conf, TridentOperationContext context) {
-
- }
-
- public Result init(Object batchId, TridentCollector collector) {
- Result ret = new Result();
- ret.obj = _agg.zero();
- return ret;
- }
-
- public void aggregate(Result val, TridentTuple tuple, TridentCollector collector) {
- Object v = tuple.getValue(0);
- if(val.obj==null) {
- val.obj = v;
- } else {
- val.obj = _agg.combine(val.obj, v);
- }
- }
-
- public void complete(Result val, TridentCollector collector) {
- collector.emit(new Values(val.obj));
- }
-
- public void cleanup() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java
deleted file mode 100644
index 9020094..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.Map;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class CombinerAggregatorInitImpl implements Function {
-
- CombinerAggregator _agg;
-
- public CombinerAggregatorInitImpl(CombinerAggregator agg) {
- _agg = agg;
- }
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- collector.emit(new Values(_agg.init(tuple)));
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java b/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java
deleted file mode 100644
index 2b96834..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.Map;
-import storm.trident.operation.Filter;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-// works by emitting null to the collector. since the planner knows this is an ADD node with
-// no new output fields, it just passes the tuple forward
-public class FilterExecutor implements Function {
- Filter _filter;
-
- public FilterExecutor(Filter filter) {
- _filter = filter;
- }
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- if(_filter.isKeep(tuple)) {
- collector.emit(null);
- }
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- _filter.prepare(conf, context);
- }
-
- @Override
- public void cleanup() {
- _filter.cleanup();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java
deleted file mode 100644
index 3bf52b3..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.operation.impl;
-
-
-public class GlobalBatchToPartition implements SingleEmitAggregator.BatchToPartition {
-
- @Override
- public int partitionIndex(Object batchId, int numPartitions) {
- // TODO: take away knowledge of storm's internals here
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java
deleted file mode 100644
index b997217..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.ComboList;
-
-public class GroupCollector implements TridentCollector {
- public List<Object> currGroup;
-
- ComboList.Factory _factory;
- TridentCollector _collector;
-
- public GroupCollector(TridentCollector collector, ComboList.Factory factory) {
- _factory = factory;
- _collector = collector;
- }
-
- @Override
- public void emit(List<Object> values) {
- List[] delegates = new List[2];
- delegates[0] = currGroup;
- delegates[1] = values;
- _collector.emit(_factory.create(delegates));
- }
-
- @Override
- public void reportError(Throwable t) {
- _collector.reportError(t);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java
deleted file mode 100644
index d78de70..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class GroupedAggregator implements Aggregator<Object[]> {
- ProjectionFactory _groupFactory;
- ProjectionFactory _inputFactory;
- Aggregator _agg;
- ComboList.Factory _fact;
- Fields _inFields;
- Fields _groupFields;
-
- public GroupedAggregator(Aggregator agg, Fields group, Fields input, int outSize) {
- _groupFields = group;
- _inFields = input;
- _agg = agg;
- int[] sizes = new int[2];
- sizes[0] = _groupFields.size();
- sizes[1] = outSize;
- _fact = new ComboList.Factory(sizes);
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- _inputFactory = context.makeProjectionFactory(_inFields);
- _groupFactory = context.makeProjectionFactory(_groupFields);
- _agg.prepare(conf, new TridentOperationContext(context, _inputFactory));
- }
-
- @Override
- public Object[] init(Object batchId, TridentCollector collector) {
- return new Object[] {new GroupCollector(collector, _fact), new HashMap(), batchId};
- }
-
- @Override
- public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collector) {
- GroupCollector groupColl = (GroupCollector) arr[0];
- Map<List, Object> val = (Map) arr[1];
- TridentTuple group = _groupFactory.create((TridentTupleView) tuple);
- TridentTuple input = _inputFactory.create((TridentTupleView) tuple);
- Object curr;
- if(!val.containsKey(group)) {
- curr = _agg.init(arr[2], groupColl);
- val.put((List) group, curr);
- } else {
- curr = val.get(group);
- }
- groupColl.currGroup = group;
- _agg.aggregate(curr, input, groupColl);
-
- }
-
- @Override
- public void complete(Object[] arr, TridentCollector collector) {
- Map<List, Object> val = (Map) arr[1];
- GroupCollector groupColl = (GroupCollector) arr[0];
- for(Entry<List, Object> e: val.entrySet()) {
- groupColl.currGroup = e.getKey();
- _agg.complete(e.getValue(), groupColl);
- }
- }
-
- @Override
- public void cleanup() {
- _agg.cleanup();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java
deleted file mode 100644
index 2615962..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.GroupedMultiReducer;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class GroupedMultiReducerExecutor implements MultiReducer<Map<TridentTuple, Object>> {
- GroupedMultiReducer _reducer;
- List<Fields> _groupFields;
- List<Fields> _inputFields;
- List<ProjectionFactory> _groupFactories = new ArrayList<ProjectionFactory>();
- List<ProjectionFactory> _inputFactories = new ArrayList<ProjectionFactory>();
-
- public GroupedMultiReducerExecutor(GroupedMultiReducer reducer, List<Fields> groupFields, List<Fields> inputFields) {
- if(inputFields.size()!=groupFields.size()) {
- throw new IllegalArgumentException("Multireducer groupFields and inputFields must be the same size");
- }
- _groupFields = groupFields;
- _inputFields = inputFields;
- _reducer = reducer;
- }
-
- @Override
- public void prepare(Map conf, TridentMultiReducerContext context) {
- for(int i=0; i<_groupFields.size(); i++) {
- _groupFactories.add(context.makeProjectionFactory(i, _groupFields.get(i)));
- _inputFactories.add(context.makeProjectionFactory(i, _inputFields.get(i)));
- }
- _reducer.prepare(conf, new TridentMultiReducerContext((List) _inputFactories));
- }
-
- @Override
- public Map<TridentTuple, Object> init(TridentCollector collector) {
- return new HashMap();
- }
-
- @Override
- public void execute(Map<TridentTuple, Object> state, int streamIndex, TridentTuple full, TridentCollector collector) {
- ProjectionFactory groupFactory = _groupFactories.get(streamIndex);
- ProjectionFactory inputFactory = _inputFactories.get(streamIndex);
-
- TridentTuple group = groupFactory.create(full);
- TridentTuple input = inputFactory.create(full);
-
- Object curr;
- if(!state.containsKey(group)) {
- curr = _reducer.init(collector, group);
- state.put(group, curr);
- } else {
- curr = state.get(group);
- }
- _reducer.execute(curr, streamIndex, group, input, collector);
- }
-
- @Override
- public void complete(Map<TridentTuple, Object> state, TridentCollector collector) {
- for(Map.Entry e: state.entrySet()) {
- TridentTuple group = (TridentTuple) e.getKey();
- Object val = e.getValue();
- _reducer.complete(val, group, collector);
- }
- }
-
- @Override
- public void cleanup() {
- _reducer.cleanup();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java
deleted file mode 100644
index f482ec4..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.Map;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.tuple.TridentTuple;
-
-
-public class IdentityMultiReducer implements MultiReducer {
-
- @Override
- public void prepare(Map conf, TridentMultiReducerContext context) {
- }
-
- @Override
- public Object init(TridentCollector collector) {
- return null;
- }
-
- @Override
- public void execute(Object state, int streamIndex, TridentTuple input, TridentCollector collector) {
- collector.emit(input);
- }
-
- @Override
- public void complete(Object state, TridentCollector collector) {
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java b/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java
deleted file mode 100644
index 779c4b8..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.operation.impl;
-
-import storm.trident.partition.IndexHashGrouping;
-
-public class IndexHashBatchToPartition implements SingleEmitAggregator.BatchToPartition {
-
- @Override
- public int partitionIndex(Object batchId, int numPartitions) {
- return IndexHashGrouping.objectToIndex(batchId, numPartitions);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java
deleted file mode 100644
index 963751e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import storm.trident.JoinType;
-import storm.trident.operation.GroupedMultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.operation.impl.JoinerMultiReducer.JoinState;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-
-public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
-
- List<JoinType> _types;
- List<Fields> _sideFields;
- int _numGroupFields;
- ComboList.Factory _factory;
-
-
- public JoinerMultiReducer(List<JoinType> types, int numGroupFields, List<Fields> sides) {
- _types = types;
- _sideFields = sides;
- _numGroupFields = numGroupFields;
- }
-
- @Override
- public void prepare(Map conf, TridentMultiReducerContext context) {
- int[] sizes = new int[_sideFields.size() + 1];
- sizes[0] = _numGroupFields;
- for(int i=0; i<_sideFields.size(); i++) {
- sizes[i+1] = _sideFields.get(i).size();
- }
- _factory = new ComboList.Factory(sizes);
- }
-
- @Override
- public JoinState init(TridentCollector collector, TridentTuple group) {
- return new JoinState(_types.size(), group);
- }
-
- @Override
- public void execute(JoinState state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector) {
- //TODO: do the inner join incrementally, emitting the cross join with this tuple, against all other sides
- //TODO: only do cross join if at least one tuple in each side
- List<List> side = state.sides[streamIndex];
- if(side.isEmpty()) {
- state.numSidesReceived++;
- }
-
- side.add(input);
- if(state.numSidesReceived == state.sides.length) {
- emitCrossJoin(state, collector, streamIndex, input);
- }
- }
-
- @Override
- public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
- List<List>[] sides = state.sides;
- boolean wasEmpty = state.numSidesReceived < sides.length;
- for(int i=0; i<sides.length; i++) {
- if(sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
- state.numSidesReceived++;
- sides[i].add(makeNullList(_sideFields.get(i).size()));
- }
- }
- if(wasEmpty && state.numSidesReceived == sides.length) {
- emitCrossJoin(state, collector, -1, null);
- }
- }
-
- @Override
- public void cleanup() {
- }
-
- private List<Object> makeNullList(int size) {
- List<Object> ret = new ArrayList(size);
- for(int i=0; i<size; i++) {
- ret.add(null);
- }
- return ret;
- }
-
- private void emitCrossJoin(JoinState state, TridentCollector collector, int overrideIndex, TridentTuple overrideTuple) {
- List<List>[] sides = state.sides;
- int[] indices = state.indices;
- for(int i=0; i<indices.length; i++) {
- indices[i] = 0;
- }
-
- boolean keepGoing = true;
- //emit cross-join of all emitted tuples
- while(keepGoing) {
- List[] combined = new List[sides.length+1];
- combined[0] = state.group;
- for(int i=0; i<sides.length; i++) {
- if(i==overrideIndex) {
- combined[i+1] = overrideTuple;
- } else {
- combined[i+1] = sides[i].get(indices[i]);
- }
- }
- collector.emit(_factory.create(combined));
- keepGoing = increment(sides, indices, indices.length - 1, overrideIndex);
- }
- }
-
-
- //return false if can't increment anymore
- //TODO: DRY this code up with what's in ChainedAggregatorImpl
- private boolean increment(List[] lengths, int[] indices, int j, int overrideIndex) {
- if(j==-1) return false;
- if(j==overrideIndex) {
- return increment(lengths, indices, j-1, overrideIndex);
- }
- indices[j]++;
- if(indices[j] >= lengths[j].size()) {
- indices[j] = 0;
- return increment(lengths, indices, j-1, overrideIndex);
- }
- return true;
- }
-
- public static class JoinState {
- List<List>[] sides;
- int numSidesReceived = 0;
- int[] indices;
- TridentTuple group;
-
- public JoinState(int numSides, TridentTuple group) {
- sides = new List[numSides];
- indices = new int[numSides];
- this.group = group;
- for(int i=0; i<numSides; i++) {
- sides[i] = new ArrayList<List>();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java
deleted file mode 100644
index 647d30f..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.state.ReducerValueUpdater;
-import storm.trident.state.StateUpdater;
-import storm.trident.state.snapshot.Snapshottable;
-import storm.trident.tuple.TridentTuple;
-
-public class ReducerAggStateUpdater implements StateUpdater<Snapshottable> {
- ReducerAggregator _agg;
-
- public ReducerAggStateUpdater(ReducerAggregator agg) {
- _agg = agg;
- }
-
-
- @Override
- public void updateState(Snapshottable state, List<TridentTuple> tuples, TridentCollector collector) {
- Object newVal = state.update(new ReducerValueUpdater(_agg, tuples));
- collector.emit(new Values(newVal));
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java
deleted file mode 100644
index c047762..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class ReducerAggregatorImpl implements Aggregator<Result> {
- ReducerAggregator _agg;
-
- public ReducerAggregatorImpl(ReducerAggregator agg) {
- _agg = agg;
- }
-
- public void prepare(Map conf, TridentOperationContext context) {
-
- }
-
- public Result init(Object batchId, TridentCollector collector) {
- Result ret = new Result();
- ret.obj = _agg.init();
- return ret;
- }
-
- public void aggregate(Result val, TridentTuple tuple, TridentCollector collector) {
- val.obj = _agg.reduce(val.obj, tuple);
- }
-
- public void complete(Result val, TridentCollector collector) {
- collector.emit(new Values(val.obj));
- }
-
- public void cleanup() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java b/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java
deleted file mode 100644
index 3748a7a..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package storm.trident.operation.impl;
-
-public class Result {
- public Object obj;
-
- @Override
- public String toString() {
- return "" + obj;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java
deleted file mode 100644
index 4be7c45..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.operation.impl.SingleEmitAggregator.SingleEmitState;
-import storm.trident.tuple.TridentTuple;
-
-
-public class SingleEmitAggregator implements Aggregator<SingleEmitState> {
- public static interface BatchToPartition extends Serializable {
- int partitionIndex(Object batchId, int numPartitions);
- }
-
- static class SingleEmitState {
- boolean received = false;
- Object state;
- Object batchId;
-
- public SingleEmitState(Object batchId) {
- this.batchId = batchId;
- }
- }
-
- Aggregator _agg;
- BatchToPartition _batchToPartition;
-
- public SingleEmitAggregator(Aggregator agg, BatchToPartition batchToPartition) {
- _agg = agg;
- _batchToPartition = batchToPartition;
- }
-
-
- @Override
- public SingleEmitState init(Object batchId, TridentCollector collector) {
- return new SingleEmitState(batchId);
- }
-
- @Override
- public void aggregate(SingleEmitState val, TridentTuple tuple, TridentCollector collector) {
- if(!val.received) {
- val.state = _agg.init(val.batchId, collector);
- val.received = true;
- }
- _agg.aggregate(val.state, tuple, collector);
- }
-
- @Override
- public void complete(SingleEmitState val, TridentCollector collector) {
- if(!val.received) {
- if(this.myPartitionIndex == _batchToPartition.partitionIndex(val.batchId, this.totalPartitions)) {
- val.state = _agg.init(val.batchId, collector);
- _agg.complete(val.state, collector);
- }
- } else {
- _agg.complete(val.state, collector);
- }
- }
-
- int myPartitionIndex;
- int totalPartitions;
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- _agg.prepare(conf, context);
- this.myPartitionIndex = context.getPartitionIndex();
- this.totalPartitions = context.numPartitions();
- }
-
- @Override
- public void cleanup() {
- _agg.cleanup();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java b/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java
deleted file mode 100644
index 6e9d15c..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.Map;
-import storm.trident.operation.Filter;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class TrueFilter implements Filter {
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- return true;
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java b/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java
deleted file mode 100644
index 0270bf4..0000000
--- a/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package storm.trident.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class GlobalGrouping implements CustomStreamGrouping {
-
- List<Integer> target;
-
-
- @Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {
- List<Integer> sorted = new ArrayList<Integer>(targets);
- Collections.sort(sorted);
- target = Arrays.asList(sorted.get(0));
- }
-
- @Override
- public List<Integer> chooseTasks(int i, List<Object> list) {
- return target;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java b/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java
deleted file mode 100644
index ccb9d6e..0000000
--- a/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package storm.trident.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class IdentityGrouping implements CustomStreamGrouping {
-
- List<Integer> ret = new ArrayList<Integer>();
- Map<Integer, List<Integer>> _precomputed = new HashMap();
-
- @Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> tasks) {
- List<Integer> sourceTasks = new ArrayList<Integer>(context.getComponentTasks(stream.get_componentId()));
- Collections.sort(sourceTasks);
- if(sourceTasks.size()!=tasks.size()) {
- throw new RuntimeException("Can only do an identity grouping when source and target have same number of tasks");
- }
- tasks = new ArrayList<Integer>(tasks);
- Collections.sort(tasks);
- for(int i=0; i<sourceTasks.size(); i++) {
- int s = sourceTasks.get(i);
- int t = tasks.get(i);
- _precomputed.put(s, Arrays.asList(t));
- }
- }
-
- @Override
- public List<Integer> chooseTasks(int task, List<Object> values) {
- List<Integer> ret = _precomputed.get(task);
- if(ret==null) {
- throw new RuntimeException("Tuple emitted by task that's not part of this component. Should be impossible");
- }
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java b/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java
deleted file mode 100644
index 69c36ac..0000000
--- a/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.Arrays;
-import java.util.List;
-
-public class IndexHashGrouping implements CustomStreamGrouping {
- public static int objectToIndex(Object val, int numPartitions) {
- if(val==null) return 0;
- else {
- return Math.abs(val.hashCode() % numPartitions);
- }
- }
-
- int _index;
- List<Integer> _targets;
-
- public IndexHashGrouping(int index) {
- _index = index;
- }
-
-
- @Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
- _targets = targetTasks;
- }
-
- @Override
- public List<Integer> chooseTasks(int fromTask, List<Object> values) {
- int i = objectToIndex(values.get(_index), _targets.size());
- return Arrays.asList(_targets.get(i));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java b/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java
deleted file mode 100644
index b596d54..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import storm.trident.tuple.ConsList;
-import storm.trident.tuple.TridentTuple;
-
-
-public class BridgeReceiver implements TupleReceiver {
-
- BatchOutputCollector _collector;
-
- public BridgeReceiver(BatchOutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(ProcessorContext context, String streamId, TridentTuple tuple) {
- _collector.emit(streamId, new ConsList(context.batchId, tuple));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/Node.java b/jstorm-client/src/main/java/storm/trident/planner/Node.java
deleted file mode 100644
index 1a0e29d..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/Node.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.tuple.Fields;
-
-import java.io.Serializable;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-public class Node implements Serializable {
- private static AtomicInteger INDEX = new AtomicInteger(0);
-
- private String nodeId;
-
- public String name = null;
- public Fields allOutputFields;
- public String streamId;
- public Integer parallelismHint = null;
- public NodeStateInfo stateInfo = null;
- public int creationIndex;
-
- public Node(String streamId, String name, Fields allOutputFields) {
- this.nodeId = UUID.randomUUID().toString();
- this.allOutputFields = allOutputFields;
- this.streamId = streamId;
- this.name = name;
- this.creationIndex = INDEX.incrementAndGet();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Node other = (Node) obj;
- if (nodeId == null) {
- if (other.nodeId != null)
- return false;
- } else if (!nodeId.equals(other.nodeId))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.MULTI_LINE_STYLE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java b/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java
deleted file mode 100644
index a045eef..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.planner;
-
-import java.io.Serializable;
-import storm.trident.state.StateSpec;
-
-public class NodeStateInfo implements Serializable {
- public String id;
- public StateSpec spec;
-
- public NodeStateInfo(String id, StateSpec spec) {
- this.id = id;
- this.spec = spec;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java b/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java
deleted file mode 100644
index fdde133..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.generated.Grouping;
-import backtype.storm.tuple.Fields;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import storm.trident.util.TridentUtils;
-
-
-public class PartitionNode extends Node {
- public transient Grouping thriftGrouping;
-
- //has the streamid/outputFields of the node it's doing the partitioning on
- public PartitionNode(String streamId, String name, Fields allOutputFields, Grouping grouping) {
- super(streamId, name, allOutputFields);
- this.thriftGrouping = grouping;
- }
-
- private void writeObject(ObjectOutputStream oos) throws IOException {
- oos.defaultWriteObject();
- byte[] ser = TridentUtils.thriftSerialize(thriftGrouping);
- oos.writeInt(ser.length);
- oos.write(ser);
- }
-
- private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
- ois.defaultReadObject();
- byte[] ser = new byte[ois.readInt()];
- ois.readFully(ser);
- this.thriftGrouping = TridentUtils.thriftDeserialize(Grouping.class, ser);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java b/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java
deleted file mode 100644
index dc8bb6a..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.planner;
-
-
-public class ProcessorContext {
- public Object batchId;
- public Object[] state;
-
- public ProcessorContext(Object batchId, Object[] state) {
- this.batchId = batchId;
- this.state = state;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java b/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java
deleted file mode 100644
index c0e09aa..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.tuple.Fields;
-
-public class ProcessorNode extends Node {
-
- public boolean committer; // for partitionpersist
- public TridentProcessor processor;
- public Fields selfOutFields;
-
- public ProcessorNode(String streamId, String name, Fields allOutputFields, Fields selfOutFields, TridentProcessor processor) {
- super(streamId, name, allOutputFields);
- this.processor = processor;
- this.selfOutFields = selfOutFields;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java b/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java
deleted file mode 100644
index 1432c43..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.tuple.Fields;
-
-
-public class SpoutNode extends Node {
- public static enum SpoutType {
- DRPC,
- BATCH
- }
-
- public Object spout;
- public String txId; //where state is stored in zookeeper (only for batch spout types)
- public SpoutType type;
-
- public SpoutNode(String streamId, Fields allOutputFields, String txid, Object spout, SpoutType type) {
- super(streamId, null, allOutputFields);
- this.txId = txid;
- this.spout = spout;
- this.type = type;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java b/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java
deleted file mode 100644
index 596c15d..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java
+++ /dev/null
@@ -1,201 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.jgrapht.DirectedGraph;
-import org.jgrapht.graph.DirectedSubgraph;
-import org.jgrapht.traverse.TopologicalOrderIterator;
-import storm.trident.planner.processor.TridentContext;
-import storm.trident.state.State;
-import storm.trident.topology.BatchInfo;
-import storm.trident.topology.ITridentBatchBolt;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-import storm.trident.tuple.TridentTupleView.RootFactory;
-import storm.trident.util.TridentUtils;
-
-// TODO: parameterizing it like this with everything might be a high deserialization cost if there's lots of tasks?
-// TODO: memory problems?
-// TODO: can avoid these problems by adding a boltfactory abstraction, so that boltfactory is deserialized once
-// bolt factory -> returns coordinatedbolt per task, but deserializes the batch bolt one time and caches
-public class SubtopologyBolt implements ITridentBatchBolt {
- DirectedGraph _graph;
- Set<Node> _nodes;
- Map<String, InitialReceiver> _roots = new HashMap();
- Map<Node, Factory> _outputFactories = new HashMap();
- Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap();
- Map<Node, String> _batchGroups;
-
- //given processornodes and static state nodes
- public SubtopologyBolt(DirectedGraph graph, Set<Node> nodes, Map<Node, String> batchGroups) {
- _nodes = nodes;
- _graph = graph;
- _batchGroups = batchGroups;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector batchCollector) {
- int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
- for(Node n: _nodes) {
- if(n.stateInfo!=null) {
- State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
- context.setTaskData(n.stateInfo.id, s);
- }
- }
- DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, _nodes, null);
- TopologicalOrderIterator it = new TopologicalOrderIterator<Node, Object>(subgraph);
- int stateIndex = 0;
- while(it.hasNext()) {
- Node n = (Node) it.next();
- if(n instanceof ProcessorNode) {
- ProcessorNode pn = (ProcessorNode) n;
- String batchGroup = _batchGroups.get(n);
- if(!_myTopologicallyOrdered.containsKey(batchGroup)) {
- _myTopologicallyOrdered.put(batchGroup, new ArrayList());
- }
- _myTopologicallyOrdered.get(batchGroup).add(pn.processor);
- List<String> parentStreams = new ArrayList();
- List<Factory> parentFactories = new ArrayList();
- for(Node p: TridentUtils.getParents(_graph, n)) {
- parentStreams.add(p.streamId);
- if(_nodes.contains(p)) {
- parentFactories.add(_outputFactories.get(p));
- } else {
- if(!_roots.containsKey(p.streamId)) {
- _roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId)));
- }
- _roots.get(p.streamId).addReceiver(pn.processor);
- parentFactories.add(_roots.get(p.streamId).getOutputFactory());
- }
- }
- List<TupleReceiver> targets = new ArrayList();
- boolean outgoingNode = false;
- for(Node cn: TridentUtils.getChildren(_graph, n)) {
- if(_nodes.contains(cn)) {
- targets.add(((ProcessorNode) cn).processor);
- } else {
- outgoingNode = true;
- }
- }
- if(outgoingNode) {
- targets.add(new BridgeReceiver(batchCollector));
- }
-
- TridentContext triContext = new TridentContext(
- pn.selfOutFields,
- parentFactories,
- parentStreams,
- targets,
- pn.streamId,
- stateIndex,
- batchCollector
- );
- pn.processor.prepare(conf, context, triContext);
- _outputFactories.put(n, pn.processor.getOutputFactory());
- }
- stateIndex++;
- }
- // TODO: get prepared one time into executor data... need to avoid the ser/deser
- // for each task (probably need storm to support boltfactory)
- }
-
- private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
- for(GlobalStreamId g: context.getThisSources().keySet()) {
- if(g.get_streamId().equals(sourceStream)) {
- return context.getComponentOutputFields(g);
- }
- }
- throw new RuntimeException("Could not find fields for source stream " + sourceStream);
- }
-
- @Override
- public void execute(BatchInfo batchInfo, Tuple tuple) {
- String sourceStream = tuple.getSourceStreamId();
- InitialReceiver ir = _roots.get(sourceStream);
- if(ir==null) {
- throw new RuntimeException("Received unexpected tuple " + tuple.toString());
- }
- ir.receive((ProcessorContext) batchInfo.state, tuple);
- }
-
- @Override
- public void finishBatch(BatchInfo batchInfo) {
- for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
- p.finishBatch((ProcessorContext) batchInfo.state);
- }
- }
-
- @Override
- public Object initBatchState(String batchGroup, Object batchId) {
- ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
- for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
- p.startBatch(ret);
- }
- return ret;
- }
-
- @Override
- public void cleanup() {
- for(String bg: _myTopologicallyOrdered.keySet()) {
- for(TridentProcessor p: _myTopologicallyOrdered.get(bg)) {
- p.cleanup();
- }
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for(Node n: _nodes) {
- declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-
- protected class InitialReceiver {
- List<TridentProcessor> _receivers = new ArrayList();
- RootFactory _factory;
- ProjectionFactory _project;
- String _stream;
-
- public InitialReceiver(String stream, Fields allFields) {
- // TODO: don't want to project for non-batch bolts...???
- // how to distinguish "batch" streams from non-batch streams?
- _stream = stream;
- _factory = new RootFactory(allFields);
- List<String> projected = new ArrayList(allFields.toList());
- projected.remove(0);
- _project = new ProjectionFactory(_factory, new Fields(projected));
- }
-
- public void receive(ProcessorContext context, Tuple tuple) {
- TridentTuple t = _project.create(_factory.create(tuple));
- for(TridentProcessor r: _receivers) {
- r.execute(context, _stream, t);
- }
- }
-
- public void addReceiver(TridentProcessor p) {
- _receivers.add(p);
- }
-
- public Factory getOutputFactory() {
- return _project;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java
deleted file mode 100644
index 866d058..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.task.TopologyContext;
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.planner.processor.TridentContext;
-import storm.trident.tuple.TridentTuple.Factory;
-
-public interface TridentProcessor extends Serializable, TupleReceiver {
-
- // imperative that don't emit any tuples from here, since output factory cannot be gotten until
- // preparation is done, therefore, receivers won't be ready to receive tuples yet
- // can't emit tuples from here anyway, since it's not within a batch context (which is only
- // startBatch, execute, and finishBatch
- void prepare(Map conf, TopologyContext context, TridentContext tridentContext);
- void cleanup();
-
- void startBatch(ProcessorContext processorContext);
-
- void finishBatch(ProcessorContext processorContext);
-
- Factory getOutputFactory();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java b/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java
deleted file mode 100644
index a2fc148..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package storm.trident.planner;
-
-import storm.trident.tuple.TridentTuple;
-
-
-public interface TupleReceiver {
- //streaId indicates where tuple came from
- void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java
deleted file mode 100644
index ce62790..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class AggregateProcessor implements TridentProcessor {
- Aggregator _agg;
- TridentContext _context;
- FreshCollector _collector;
- Fields _inputFields;
- ProjectionFactory _projection;
-
- public AggregateProcessor(Fields inputFields, Aggregator agg) {
- _agg = agg;
- _inputFields = inputFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
- List<Factory> parents = tridentContext.getParentTupleFactories();
- if(parents.size()!=1) {
- throw new RuntimeException("Aggregate operation can only have one parent");
- }
- _context = tridentContext;
- _collector = new FreshCollector(tridentContext);
- _projection = new ProjectionFactory(parents.get(0), _inputFields);
- _agg.prepare(conf, new TridentOperationContext(context, _projection));
- }
-
- @Override
- public void cleanup() {
- _agg.cleanup();
- }
-
- @Override
- public void startBatch(ProcessorContext processorContext) {
- _collector.setContext(processorContext);
- processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
- }
-
- @Override
- public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
- _collector.setContext(processorContext);
- _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
- }
-
- @Override
- public void finishBatch(ProcessorContext processorContext) {
- _collector.setContext(processorContext);
- _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
- }
-
- @Override
- public Factory getOutputFactory() {
- return _collector.getOutputFactory();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java b/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java
deleted file mode 100644
index 92932cb..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package storm.trident.planner.processor;
-
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TupleReceiver;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView;
-import storm.trident.tuple.TridentTupleView.OperationOutputFactory;
-
-
-public class AppendCollector implements TridentCollector {
- OperationOutputFactory _factory;
- TridentContext _triContext;
- TridentTuple tuple;
- ProcessorContext context;
-
- public AppendCollector(TridentContext context) {
- _triContext = context;
- _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
- }
-
- public void setContext(ProcessorContext pc, TridentTuple t) {
- this.context = pc;
- this.tuple = t;
- }
-
- @Override
- public void emit(List<Object> values) {
- TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
- for(TupleReceiver r: _triContext.getReceivers()) {
- r.execute(context, _triContext.getOutStreamId(), toEmit);
- }
- }
-
- @Override
- public void reportError(Throwable t) {
- _triContext.getDelegateCollector().reportError(t);
- }
-
- public Factory getOutputFactory() {
- return _factory;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java
deleted file mode 100644
index 7b217de..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class EachProcessor implements TridentProcessor {
- Function _function;
- TridentContext _context;
- AppendCollector _collector;
- Fields _inputFields;
- ProjectionFactory _projection;
-
- public EachProcessor(Fields inputFields, Function function) {
- _function = function;
- _inputFields = inputFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
- List<Factory> parents = tridentContext.getParentTupleFactories();
- if(parents.size()!=1) {
- throw new RuntimeException("Each operation can only have one parent");
- }
- _context = tridentContext;
- _collector = new AppendCollector(tridentContext);
- _projection = new ProjectionFactory(parents.get(0), _inputFields);
- _function.prepare(conf, new TridentOperationContext(context, _projection));
- }
-
- @Override
- public void cleanup() {
- _function.cleanup();
- }
-
- @Override
- public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
- _collector.setContext(processorContext, tuple);
- _function.execute(_projection.create(tuple), _collector);
- }
-
- @Override
- public void startBatch(ProcessorContext processorContext) {
- }
-
- @Override
- public void finishBatch(ProcessorContext processorContext) {
- }
-
- @Override
- public Factory getOutputFactory() {
- return _collector.getOutputFactory();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java b/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java
deleted file mode 100644
index 1fb3aa6..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package storm.trident.planner.processor;
-
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TupleReceiver;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.FreshOutputFactory;
-
-
-public class FreshCollector implements TridentCollector {
- FreshOutputFactory _factory;
- TridentContext _triContext;
- ProcessorContext context;
-
- public FreshCollector(TridentContext context) {
- _triContext = context;
- _factory = new FreshOutputFactory(context.getSelfOutputFields());
- }
-
- public void setContext(ProcessorContext pc) {
- this.context = pc;
- }
-
- @Override
- public void emit(List<Object> values) {
- TridentTuple toEmit = _factory.create(values);
- for(TupleReceiver r: _triContext.getReceivers()) {
- r.execute(context, _triContext.getOutStreamId(), toEmit);
- }
- }
-
- @Override
- public void reportError(Throwable t) {
- _triContext.getDelegateCollector().reportError(t);
- }
-
- public Factory getOutputFactory() {
- return _factory;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java
deleted file mode 100644
index 1998e1a..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class MultiReducerProcessor implements TridentProcessor {
- MultiReducer _reducer;
- TridentContext _context;
- Map<String, Integer> _streamToIndex;
- List<Fields> _projectFields;
- ProjectionFactory[] _projectionFactories;
- FreshCollector _collector;
-
- public MultiReducerProcessor(List<Fields> inputFields, MultiReducer reducer) {
- _reducer = reducer;
- _projectFields = inputFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
- List<Factory> parents = tridentContext.getParentTupleFactories();
- _context = tridentContext;
- _streamToIndex = new HashMap<String, Integer>();
- List<String> parentStreams = tridentContext.getParentStreams();
- for(int i=0; i<parentStreams.size(); i++) {
- _streamToIndex.put(parentStreams.get(i), i);
- }
- _projectionFactories = new ProjectionFactory[_projectFields.size()];
- for(int i=0; i<_projectFields.size(); i++) {
- _projectionFactories[i] = new ProjectionFactory(parents.get(i), _projectFields.get(i));
- }
- _collector = new FreshCollector(tridentContext);
- _reducer.prepare(conf, new TridentMultiReducerContext((List) Arrays.asList(_projectionFactories)));
- }
-
- @Override
- public void cleanup() {
- _reducer.cleanup();
- }
-
- @Override
- public void startBatch(ProcessorContext processorContext) {
- _collector.setContext(processorContext);
- processorContext.state[_context.getStateIndex()] = _reducer.init(_collector);
- }
-
- @Override
- public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
- _collector.setContext(processorContext);
- int i = _streamToIndex.get(streamId);
- _reducer.execute(processorContext.state[_context.getStateIndex()], i, _projectionFactories[i].create(tuple), _collector);
- }
-
- @Override
- public void finishBatch(ProcessorContext processorContext) {
- _collector.setContext(processorContext);
- _reducer.complete(processorContext.state[_context.getStateIndex()], _collector);
- }
-
- @Override
- public Factory getOutputFactory() {
- return _collector.getOutputFactory();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java
deleted file mode 100644
index 5ab2357..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import storm.trident.topology.TransactionAttempt;
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.state.State;
-import storm.trident.state.StateUpdater;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class PartitionPersistProcessor implements TridentProcessor {
- StateUpdater _updater;
- State _state;
- String _stateId;
- TridentContext _context;
- Fields _inputFields;
- ProjectionFactory _projection;
- FreshCollector _collector;
-
- public PartitionPersistProcessor(String stateId, Fields inputFields, StateUpdater updater) {
- _updater = updater;
- _stateId = stateId;
- _inputFields = inputFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
- List<Factory> parents = tridentContext.getParentTupleFactories();
- if(parents.size()!=1) {
- throw new RuntimeException("Partition persist operation can only have one parent");
- }
- _context = tridentContext;
- _state = (State) context.getTaskData(_stateId);
- _projection = new ProjectionFactory(parents.get(0), _inputFields);
- _collector = new FreshCollector(tridentContext);
- _updater.prepare(conf, new TridentOperationContext(context, _projection));
- }
-
- @Override
- public void cleanup() {
- _updater.cleanup();
- }
-
- @Override
- public void startBatch(ProcessorContext processorContext) {
- processorContext.state[_context.getStateIndex()] = new ArrayList<TridentTuple>();
- }
-
- @Override
- public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
- ((List) processorContext.state[_context.getStateIndex()]).add(_projection.create(tuple));
- }
-
- @Override
- public void finishBatch(ProcessorContext processorContext) {
- _collector.setContext(processorContext);
- Object batchId = processorContext.batchId;
- // since this processor type is a committer, this occurs in the commit phase
- List<TridentTuple> buffer = (List) processorContext.state[_context.getStateIndex()];
-
- // don't update unless there are tuples
- // this helps out with things like global partition persist, where multiple tasks may still
- // exist for this processor. Only want the global one to do anything
- // this is also a helpful optimization that state implementations don't need to manually do
- if(buffer.size() > 0) {
- Long txid = null;
- // this is to support things like persisting off of drpc stream, which is inherently unreliable
- // and won't have a tx attempt
- if(batchId instanceof TransactionAttempt) {
- txid = ((TransactionAttempt) batchId).getTransactionId();
- }
- _state.beginCommit(txid);
- _updater.updateState(_state, buffer, _collector);
- _state.commit(txid);
- }
- }
-
- @Override
- public Factory getOutputFactory() {
- return _collector.getOutputFactory();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java
deleted file mode 100644
index c6d34e5..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.planner.TupleReceiver;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class ProjectedProcessor implements TridentProcessor {
- Fields _projectFields;
- ProjectionFactory _factory;
- TridentContext _context;
-
- public ProjectedProcessor(Fields projectFields) {
- _projectFields = projectFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
- if(tridentContext.getParentTupleFactories().size()!=1) {
- throw new RuntimeException("Projection processor can only have one parent");
- }
- _context = tridentContext;
- _factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public void startBatch(ProcessorContext processorContext) {
- }
-
- @Override
- public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
- TridentTuple toEmit = _factory.create(tuple);
- for(TupleReceiver r: _context.getReceivers()) {
- r.execute(processorContext, _context.getOutStreamId(), toEmit);
- }
- }
-
- @Override
- public void finishBatch(ProcessorContext processorContext) {
- }
-
- @Override
- public Factory getOutputFactory() {
- return _factory;
- }
-}