You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:59 UTC

[20/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java
deleted file mode 100644
index 17e12ee..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/MapGet.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package storm.trident.operation.builtin;
-
-import backtype.storm.tuple.Values;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.map.ReadOnlyMapState;
-import storm.trident.tuple.TridentTuple;
-
-
-public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
-    @Override
-    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
-        return map.multiGet((List) keys);
-    }    
-    
-    @Override
-    public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
-        collector.emit(new Values(result));
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java
deleted file mode 100644
index 7a48477..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Negate.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package storm.trident.operation.builtin;
-
-import java.util.Map;
-import storm.trident.operation.Filter;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class Negate implements Filter {
-    
-    Filter _delegate;
-    
-    public Negate(Filter delegate) {
-        _delegate = delegate;
-    }
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        return !_delegate.isKeep(tuple);
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        _delegate.prepare(conf, context);
-    }
-
-    @Override
-    public void cleanup() {
-        _delegate.cleanup();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java
deleted file mode 100644
index fbc3286..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/SnapshotGet.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package storm.trident.operation.builtin;
-
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.snapshot.ReadOnlySnapshottable;
-import storm.trident.tuple.TridentTuple;
-
-public class SnapshotGet extends BaseQueryFunction<ReadOnlySnapshottable, Object> {
-
-    @Override
-    public List<Object> batchRetrieve(ReadOnlySnapshottable state, List<TridentTuple> args) {
-        List<Object> ret = new ArrayList<Object>(args.size());
-        Object snapshot = state.get();
-        for(int i=0; i<args.size(); i++) {
-            ret.add(snapshot);
-        }
-        return ret;
-    }
-
-    @Override
-    public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
-        collector.emit(new Values(result));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java
deleted file mode 100644
index d67ac66..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Sum.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package storm.trident.operation.builtin;
-
-import clojure.lang.Numbers;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.tuple.TridentTuple;
-
-
-public class Sum implements CombinerAggregator<Number> {
-
-    @Override
-    public Number init(TridentTuple tuple) {
-        return (Number) tuple.getValue(0);
-    }
-
-    @Override
-    public Number combine(Number val1, Number val2) {
-        return Numbers.add(val1, val2);
-    }
-
-    @Override
-    public Number zero() {
-        return 0;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java
deleted file mode 100644
index 6302e02..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/TupleCollectionGet.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.state.ITupleCollection;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
-
-public class TupleCollectionGet extends BaseQueryFunction<State, Iterator<List<Object>>> {
-
-    @Override
-    public List<Iterator<List<Object>>> batchRetrieve(State state, List<TridentTuple> args) {
-        List<Iterator<List<Object>>> ret = new ArrayList(args.size());
-        for(int i=0; i<args.size(); i++) {
-            ret.add(((ITupleCollection)state).getTuples());
-        }
-        return ret;
-    }
-
-    @Override
-    public void execute(TridentTuple tuple, Iterator<List<Object>> tuplesIterator, TridentCollector collector) {
-        while(tuplesIterator.hasNext()) {
-            collector.emit(tuplesIterator.next());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java
deleted file mode 100644
index 9fe4419..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CaptureCollector.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-
-public class CaptureCollector implements TridentCollector {
-    public List<List<Object>> captured = new ArrayList();
-    
-    TridentCollector _coll;
-    
-    public void setCollector(TridentCollector coll) {
-        _coll = coll;
-    }
-    
-    @Override
-    public void emit(List<Object> values) {
-        this.captured.add(values);
-    }
-
-    @Override
-    public void reportError(Throwable t) {
-        _coll.reportError(t);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java
deleted file mode 100644
index f8bd001..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedAggregatorImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
-    Aggregator[] _aggs;
-    ProjectionFactory[] _inputFactories;
-    ComboList.Factory _fact;
-    Fields[] _inputFields;
-    
-    
-    
-    public ChainedAggregatorImpl(Aggregator[] aggs, Fields[] inputFields, ComboList.Factory fact) {
-        _aggs = aggs;
-        _inputFields = inputFields;
-        _fact = fact;
-        if(_aggs.length!=_inputFields.length) {
-            throw new IllegalArgumentException("Require input fields for each aggregator");
-        }
-    }
-    
-    public void prepare(Map conf, TridentOperationContext context) {
-        _inputFactories = new ProjectionFactory[_inputFields.length];
-        for(int i=0; i<_inputFields.length; i++) {
-            _inputFactories[i] = context.makeProjectionFactory(_inputFields[i]);
-            _aggs[i].prepare(conf, new TridentOperationContext(context, _inputFactories[i]));
-        }
-    }
-    
-    public ChainedResult init(Object batchId, TridentCollector collector) {
-        ChainedResult initted = new ChainedResult(collector, _aggs.length);
-        for(int i=0; i<_aggs.length; i++) {
-            initted.objs[i] = _aggs[i].init(batchId, initted.collectors[i]);
-        }
-        return initted;
-    }
-    
-    public void aggregate(ChainedResult val, TridentTuple tuple, TridentCollector collector) {
-        val.setFollowThroughCollector(collector);
-        for(int i=0; i<_aggs.length; i++) {
-            TridentTuple projected = _inputFactories[i].create((TridentTupleView) tuple);
-            _aggs[i].aggregate(val.objs[i], projected, val.collectors[i]);
-        }
-    }
-    
-    public void complete(ChainedResult val, TridentCollector collector) {
-        val.setFollowThroughCollector(collector);
-        for(int i=0; i<_aggs.length; i++) {
-            _aggs[i].complete(val.objs[i], val.collectors[i]);
-        }
-        if(_aggs.length > 1) { // otherwise, tuples were emitted directly
-            int[] indices = new int[val.collectors.length];
-            for(int i=0; i<indices.length; i++) {
-                indices[i] = 0;
-            }
-            boolean keepGoing = true;
-            //emit cross-join of all emitted tuples
-            while(keepGoing) {
-                List[] combined = new List[_aggs.length];
-                for(int i=0; i< _aggs.length; i++) {
-                    CaptureCollector capturer = (CaptureCollector) val.collectors[i];
-                    combined[i] = capturer.captured.get(indices[i]);
-                }
-                collector.emit(_fact.create(combined));
-                keepGoing = increment(val.collectors, indices, indices.length - 1);
-            }
-        }
-    }
-    
-    //return false if can't increment anymore
-    private boolean increment(TridentCollector[] lengths, int[] indices, int j) {
-        if(j==-1) return false;
-        indices[j]++;
-        CaptureCollector capturer = (CaptureCollector) lengths[j];
-        if(indices[j] >= capturer.captured.size()) {
-            indices[j] = 0;
-            return increment(lengths, indices, j-1);
-        }
-        return true;
-    }
-    
-    public void cleanup() {
-       for(Aggregator a: _aggs) {
-           a.cleanup();
-       } 
-    } 
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java
deleted file mode 100644
index a35df3a..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ChainedResult.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.operation.impl;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import storm.trident.operation.TridentCollector;
-
-
-//for ChainedAggregator
-public class ChainedResult {
-    Object[] objs;
-    TridentCollector[] collectors;
-    
-    public ChainedResult(TridentCollector collector, int size) {
-        objs = new Object[size];
-        collectors = new TridentCollector[size];
-        for(int i=0; i<size; i++) {
-            if(size==1) {
-                collectors[i] = collector;
-            } else {
-                collectors[i] = new CaptureCollector();                
-            }
-        }
-    }
-    
-    public void setFollowThroughCollector(TridentCollector collector) {
-        if(collectors.length>1) {
-            for(TridentCollector c: collectors) {
-                ((CaptureCollector) c).setCollector(collector);
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(objs);
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java
deleted file mode 100644
index 97a9b9d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggStateUpdater.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.state.CombinerValueUpdater;
-import storm.trident.state.StateUpdater;
-import storm.trident.state.snapshot.Snapshottable;
-import storm.trident.tuple.TridentTuple;
-
-public class CombinerAggStateUpdater implements StateUpdater<Snapshottable> {
-    CombinerAggregator _agg;
-    
-    public CombinerAggStateUpdater(CombinerAggregator agg) {
-        _agg = agg;
-    }
-    
-
-    @Override
-    public void updateState(Snapshottable state, List<TridentTuple> tuples, TridentCollector collector) {
-        if(tuples.size()!=1) {
-            throw new IllegalArgumentException("Combiner state updater should receive a single tuple. Received: " + tuples.toString());
-        }
-        Object newVal = state.update(new CombinerValueUpdater(_agg, tuples.get(0).getValue(0)));
-        collector.emit(new Values(newVal));
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {        
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
deleted file mode 100644
index d9d00e5..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class CombinerAggregatorCombineImpl implements Aggregator<Result> {
-    CombinerAggregator _agg;
-    
-    public CombinerAggregatorCombineImpl(CombinerAggregator agg) {
-        _agg = agg;
-    }
-    
-    public void prepare(Map conf, TridentOperationContext context) {
-        
-    }
-    
-    public Result init(Object batchId, TridentCollector collector) {
-        Result ret = new Result();
-        ret.obj = _agg.zero();
-        return ret;
-    }
-    
-    public void aggregate(Result val, TridentTuple tuple, TridentCollector collector) {
-        Object v = tuple.getValue(0);
-        if(val.obj==null) {
-            val.obj = v;
-        } else {
-            val.obj = _agg.combine(val.obj, v);
-        }
-    }
-    
-    public void complete(Result val, TridentCollector collector) {
-        collector.emit(new Values(val.obj));        
-    }
-    
-    public void cleanup() {
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java
deleted file mode 100644
index 9020094..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/CombinerAggregatorInitImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.Map;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class CombinerAggregatorInitImpl implements Function {
-
-    CombinerAggregator _agg;
-    
-    public CombinerAggregatorInitImpl(CombinerAggregator agg) {
-        _agg = agg;
-    }
-    
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        collector.emit(new Values(_agg.init(tuple)));
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java b/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java
deleted file mode 100644
index 2b96834..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/FilterExecutor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.Map;
-import storm.trident.operation.Filter;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-// works by emitting null to the collector. since the planner knows this is an ADD node with
-// no new output fields, it just passes the tuple forward
-public class FilterExecutor implements Function {
-    Filter _filter;
-
-    public FilterExecutor(Filter filter) {
-        _filter = filter;
-    }
-    
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        if(_filter.isKeep(tuple)) {
-            collector.emit(null);
-        }
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        _filter.prepare(conf, context);
-    }
-
-    @Override
-    public void cleanup() {
-        _filter.cleanup();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java
deleted file mode 100644
index 3bf52b3..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GlobalBatchToPartition.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.operation.impl;
-
-
-public class GlobalBatchToPartition implements SingleEmitAggregator.BatchToPartition {
-
-    @Override
-    public int partitionIndex(Object batchId, int numPartitions) {
-        // TODO: take away knowledge of storm's internals here
-        return 0;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java
deleted file mode 100644
index b997217..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupCollector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.ComboList;
-
-public class GroupCollector implements TridentCollector {
-    public List<Object> currGroup;
-    
-    ComboList.Factory _factory;
-    TridentCollector _collector;
-    
-    public GroupCollector(TridentCollector collector, ComboList.Factory factory) {
-        _factory = factory;
-        _collector = collector;
-    }
-    
-    @Override
-    public void emit(List<Object> values) {
-        List[] delegates = new List[2];
-        delegates[0] = currGroup;
-        delegates[1] = values;
-        _collector.emit(_factory.create(delegates));
-    }
-
-    @Override
-    public void reportError(Throwable t) {
-        _collector.reportError(t);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java
deleted file mode 100644
index d78de70..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedAggregator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class GroupedAggregator implements Aggregator<Object[]> {
-    ProjectionFactory _groupFactory;
-    ProjectionFactory _inputFactory;
-    Aggregator _agg;
-    ComboList.Factory _fact;
-    Fields _inFields;
-    Fields _groupFields;
-    
-    public GroupedAggregator(Aggregator agg, Fields group, Fields input, int outSize) {
-        _groupFields = group;
-        _inFields = input;
-        _agg = agg;
-        int[] sizes = new int[2];
-        sizes[0] = _groupFields.size();
-        sizes[1] = outSize;
-        _fact = new ComboList.Factory(sizes);
-    }
-    
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        _inputFactory = context.makeProjectionFactory(_inFields);
-        _groupFactory = context.makeProjectionFactory(_groupFields);
-        _agg.prepare(conf, new TridentOperationContext(context, _inputFactory));
-    }
-
-    @Override
-    public Object[] init(Object batchId, TridentCollector collector) {
-        return new Object[] {new GroupCollector(collector, _fact), new HashMap(), batchId};
-    }
-
-    @Override
-    public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collector) {
-        GroupCollector groupColl = (GroupCollector) arr[0];
-        Map<List, Object> val = (Map) arr[1];
-        TridentTuple group = _groupFactory.create((TridentTupleView) tuple);
-        TridentTuple input = _inputFactory.create((TridentTupleView) tuple);
-        Object curr;
-        if(!val.containsKey(group)) {
-            curr = _agg.init(arr[2], groupColl);
-            val.put((List) group, curr);
-        } else {
-            curr = val.get(group);
-        }
-        groupColl.currGroup = group;
-        _agg.aggregate(curr, input, groupColl);
-        
-    }
-
-    @Override
-    public void complete(Object[] arr, TridentCollector collector) {
-        Map<List, Object> val = (Map) arr[1];        
-        GroupCollector groupColl = (GroupCollector) arr[0];
-        for(Entry<List, Object> e: val.entrySet()) {
-            groupColl.currGroup = e.getKey();
-            _agg.complete(e.getValue(), groupColl);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        _agg.cleanup();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java b/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java
deleted file mode 100644
index 2615962..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/GroupedMultiReducerExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.GroupedMultiReducer;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class GroupedMultiReducerExecutor implements MultiReducer<Map<TridentTuple, Object>> {
-    GroupedMultiReducer _reducer;
-    List<Fields> _groupFields;
-    List<Fields> _inputFields;
-    List<ProjectionFactory> _groupFactories = new ArrayList<ProjectionFactory>();
-    List<ProjectionFactory> _inputFactories = new ArrayList<ProjectionFactory>();
-    
-    public GroupedMultiReducerExecutor(GroupedMultiReducer reducer, List<Fields> groupFields, List<Fields> inputFields) {
-        if(inputFields.size()!=groupFields.size()) {
-            throw new IllegalArgumentException("Multireducer groupFields and inputFields must be the same size");
-        }
-        _groupFields = groupFields;
-        _inputFields = inputFields;
-        _reducer = reducer;
-    }
-    
-    @Override
-    public void prepare(Map conf, TridentMultiReducerContext context) {
-        for(int i=0; i<_groupFields.size(); i++) {
-            _groupFactories.add(context.makeProjectionFactory(i, _groupFields.get(i)));
-            _inputFactories.add(context.makeProjectionFactory(i, _inputFields.get(i)));
-        }
-        _reducer.prepare(conf, new TridentMultiReducerContext((List) _inputFactories));
-    }
-
-    @Override
-    public Map<TridentTuple, Object> init(TridentCollector collector) {
-        return new HashMap();
-    }
-
-    @Override
-    public void execute(Map<TridentTuple, Object> state, int streamIndex, TridentTuple full, TridentCollector collector) {
-        ProjectionFactory groupFactory = _groupFactories.get(streamIndex);
-        ProjectionFactory inputFactory = _inputFactories.get(streamIndex);
-        
-        TridentTuple group = groupFactory.create(full);
-        TridentTuple input = inputFactory.create(full);
-        
-        Object curr;
-        if(!state.containsKey(group)) {
-            curr = _reducer.init(collector, group);
-            state.put(group, curr);
-        } else {
-            curr = state.get(group);
-        }
-        _reducer.execute(curr, streamIndex, group, input, collector);
-    }
-
-    @Override
-    public void complete(Map<TridentTuple, Object> state, TridentCollector collector) {
-        for(Map.Entry e: state.entrySet()) {
-            TridentTuple group = (TridentTuple) e.getKey();
-            Object val = e.getValue();
-            _reducer.complete(val, group, collector);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        _reducer.cleanup();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java
deleted file mode 100644
index f482ec4..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/IdentityMultiReducer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.Map;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.tuple.TridentTuple;
-
-
-public class IdentityMultiReducer implements MultiReducer {
-
-    @Override
-    public void prepare(Map conf, TridentMultiReducerContext context) {
-    }
-
-    @Override
-    public Object init(TridentCollector collector) {
-        return null;
-    }
-
-    @Override
-    public void execute(Object state, int streamIndex, TridentTuple input, TridentCollector collector) {
-        collector.emit(input);
-    }
-
-    @Override
-    public void complete(Object state, TridentCollector collector) {
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java b/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java
deleted file mode 100644
index 779c4b8..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/IndexHashBatchToPartition.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.operation.impl;
-
-import storm.trident.partition.IndexHashGrouping;
-
-public class IndexHashBatchToPartition implements SingleEmitAggregator.BatchToPartition {
-
-    @Override
-    public int partitionIndex(Object batchId, int numPartitions) {
-        return IndexHashGrouping.objectToIndex(batchId, numPartitions);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java
deleted file mode 100644
index 963751e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/JoinerMultiReducer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import storm.trident.JoinType;
-import storm.trident.operation.GroupedMultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.operation.impl.JoinerMultiReducer.JoinState;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-
-public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
-
-    List<JoinType> _types;
-    List<Fields> _sideFields;
-    int _numGroupFields;
-    ComboList.Factory _factory;
-
-    
-    public JoinerMultiReducer(List<JoinType> types, int numGroupFields, List<Fields> sides) {
-        _types = types;
-        _sideFields = sides;
-        _numGroupFields = numGroupFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TridentMultiReducerContext context) {
-        int[] sizes = new int[_sideFields.size() + 1];
-        sizes[0] = _numGroupFields;
-        for(int i=0; i<_sideFields.size(); i++) {
-            sizes[i+1] = _sideFields.get(i).size();
-        }
-        _factory = new ComboList.Factory(sizes);
-    }
-
-    @Override
-    public JoinState init(TridentCollector collector, TridentTuple group) {
-        return new JoinState(_types.size(), group);
-    }
-
-    @Override
-    public void execute(JoinState state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector) {
-        //TODO: do the inner join incrementally, emitting the cross join with this tuple, against all other sides
-        //TODO: only do cross join if at least one tuple in each side
-        List<List> side = state.sides[streamIndex];
-        if(side.isEmpty()) {
-            state.numSidesReceived++;
-        }
-                
-        side.add(input);
-        if(state.numSidesReceived == state.sides.length) {
-            emitCrossJoin(state, collector, streamIndex, input);
-        }        
-    }
-
-    @Override
-    public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
-        List<List>[] sides = state.sides;
-        boolean wasEmpty = state.numSidesReceived < sides.length;
-        for(int i=0; i<sides.length; i++) {
-            if(sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
-                state.numSidesReceived++;
-                sides[i].add(makeNullList(_sideFields.get(i).size()));
-            }
-        }
-        if(wasEmpty && state.numSidesReceived == sides.length) {
-            emitCrossJoin(state, collector, -1, null);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-    private List<Object> makeNullList(int size) {
-        List<Object> ret = new ArrayList(size);
-        for(int i=0; i<size; i++) {
-            ret.add(null);
-        }
-        return ret;
-    }
-    
-    private void emitCrossJoin(JoinState state, TridentCollector collector, int overrideIndex, TridentTuple overrideTuple) {
-        List<List>[] sides = state.sides;
-        int[] indices = state.indices;
-        for(int i=0; i<indices.length; i++) {
-            indices[i] = 0;
-        }
-        
-        boolean keepGoing = true;
-        //emit cross-join of all emitted tuples
-        while(keepGoing) {
-            List[] combined = new List[sides.length+1];
-            combined[0] = state.group;
-            for(int i=0; i<sides.length; i++) {
-                if(i==overrideIndex) {
-                    combined[i+1] = overrideTuple;
-                } else {
-                    combined[i+1] = sides[i].get(indices[i]);                
-                }
-            }
-            collector.emit(_factory.create(combined));
-            keepGoing = increment(sides, indices, indices.length - 1, overrideIndex);
-        }
-    }
-    
-    
-    //return false if can't increment anymore
-    //TODO: DRY this code up with what's in ChainedAggregatorImpl
-    private boolean increment(List[] lengths, int[] indices, int j, int overrideIndex) {
-        if(j==-1) return false;
-        if(j==overrideIndex) {
-            return increment(lengths, indices, j-1, overrideIndex);
-        }
-        indices[j]++;
-        if(indices[j] >= lengths[j].size()) {
-            indices[j] = 0;
-            return increment(lengths, indices, j-1, overrideIndex);
-        }
-        return true;
-    }
-    
-    public static class JoinState {
-        List<List>[] sides;
-        int numSidesReceived = 0;
-        int[] indices;
-        TridentTuple group;
-        
-        public JoinState(int numSides, TridentTuple group) {
-            sides = new List[numSides];
-            indices = new int[numSides];
-            this.group = group;
-            for(int i=0; i<numSides; i++) {
-                sides[i] = new ArrayList<List>();
-            }            
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java
deleted file mode 100644
index 647d30f..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggStateUpdater.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.state.ReducerValueUpdater;
-import storm.trident.state.StateUpdater;
-import storm.trident.state.snapshot.Snapshottable;
-import storm.trident.tuple.TridentTuple;
-
-public class ReducerAggStateUpdater implements StateUpdater<Snapshottable> {
-    ReducerAggregator _agg;
-    
-    public ReducerAggStateUpdater(ReducerAggregator agg) {
-        _agg = agg;
-    }
-    
-
-    @Override
-    public void updateState(Snapshottable state, List<TridentTuple> tuples, TridentCollector collector) {
-        Object newVal = state.update(new ReducerValueUpdater(_agg, tuples));
-        collector.emit(new Values(newVal));
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {        
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java b/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java
deleted file mode 100644
index c047762..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/ReducerAggregatorImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package storm.trident.operation.impl;
-
-import backtype.storm.tuple.Values;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class ReducerAggregatorImpl implements Aggregator<Result> {
-    ReducerAggregator _agg;
-    
-    public ReducerAggregatorImpl(ReducerAggregator agg) {
-        _agg = agg;
-    }
-    
-    public void prepare(Map conf, TridentOperationContext context) {
-        
-    }
-    
-    public Result init(Object batchId, TridentCollector collector) {
-        Result ret = new Result();
-        ret.obj = _agg.init();
-        return ret;
-    }
-    
-    public void aggregate(Result val, TridentTuple tuple, TridentCollector collector) {
-        val.obj = _agg.reduce(val.obj, tuple);
-    }
-    
-    public void complete(Result val, TridentCollector collector) {
-        collector.emit(new Values(val.obj));        
-    }
-    
-    public void cleanup() {
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java b/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java
deleted file mode 100644
index 3748a7a..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/Result.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package storm.trident.operation.impl;
-
-public class Result {
-    public Object obj;
-
-    @Override
-    public String toString() {
-        return "" + obj;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java
deleted file mode 100644
index 4be7c45..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/SingleEmitAggregator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.operation.impl.SingleEmitAggregator.SingleEmitState;
-import storm.trident.tuple.TridentTuple;
-
-
-public class SingleEmitAggregator implements Aggregator<SingleEmitState> {
-    public static interface BatchToPartition extends Serializable {
-        int partitionIndex(Object batchId, int numPartitions);
-    }
-    
-    static class SingleEmitState {
-        boolean received = false;
-        Object state;
-        Object batchId;
-        
-        public SingleEmitState(Object batchId) {
-            this.batchId = batchId;
-        }
-    }
-    
-    Aggregator _agg;
-    BatchToPartition _batchToPartition;
-    
-    public SingleEmitAggregator(Aggregator agg, BatchToPartition batchToPartition) {
-        _agg = agg;
-        _batchToPartition = batchToPartition;
-    }
-    
-    
-    @Override
-    public SingleEmitState init(Object batchId, TridentCollector collector) {
-        return new SingleEmitState(batchId);
-    }
-
-    @Override
-    public void aggregate(SingleEmitState val, TridentTuple tuple, TridentCollector collector) {
-        if(!val.received) {
-            val.state = _agg.init(val.batchId, collector);
-            val.received = true;
-        }
-        _agg.aggregate(val.state, tuple, collector);
-    }
-
-    @Override
-    public void complete(SingleEmitState val, TridentCollector collector) {
-        if(!val.received) {
-            if(this.myPartitionIndex == _batchToPartition.partitionIndex(val.batchId, this.totalPartitions)) {
-                val.state = _agg.init(val.batchId, collector);
-                _agg.complete(val.state, collector);
-            }
-        } else {
-            _agg.complete(val.state, collector);
-        }
-    }
-
-    int myPartitionIndex;
-    int totalPartitions;
-    
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        _agg.prepare(conf, context);
-        this.myPartitionIndex = context.getPartitionIndex();
-        this.totalPartitions = context.numPartitions();
-    }
-
-    @Override
-    public void cleanup() {
-        _agg.cleanup();
-    }
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java b/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java
deleted file mode 100644
index 6e9d15c..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/impl/TrueFilter.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package storm.trident.operation.impl;
-
-import java.util.Map;
-import storm.trident.operation.Filter;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.tuple.TridentTuple;
-
-public class TrueFilter implements Filter {
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        return true;
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java b/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java
deleted file mode 100644
index 0270bf4..0000000
--- a/jstorm-client/src/main/java/storm/trident/partition/GlobalGrouping.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package storm.trident.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class GlobalGrouping implements CustomStreamGrouping {
-
-    List<Integer> target;
-    
-    
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {
-        List<Integer> sorted = new ArrayList<Integer>(targets);
-        Collections.sort(sorted);
-        target = Arrays.asList(sorted.get(0));
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int i, List<Object> list) {
-        return target;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java b/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java
deleted file mode 100644
index ccb9d6e..0000000
--- a/jstorm-client/src/main/java/storm/trident/partition/IdentityGrouping.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package storm.trident.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class IdentityGrouping implements CustomStreamGrouping {
-
-    List<Integer> ret = new ArrayList<Integer>();
-    Map<Integer, List<Integer>> _precomputed = new HashMap();
-    
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> tasks) {
-        List<Integer> sourceTasks = new ArrayList<Integer>(context.getComponentTasks(stream.get_componentId()));
-        Collections.sort(sourceTasks);
-        if(sourceTasks.size()!=tasks.size()) {
-            throw new RuntimeException("Can only do an identity grouping when source and target have same number of tasks");
-        }
-        tasks = new ArrayList<Integer>(tasks);
-        Collections.sort(tasks);
-        for(int i=0; i<sourceTasks.size(); i++) {
-            int s = sourceTasks.get(i);
-            int t = tasks.get(i);
-            _precomputed.put(s, Arrays.asList(t));
-        }
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int task, List<Object> values) {
-        List<Integer> ret = _precomputed.get(task);
-        if(ret==null) {
-            throw new RuntimeException("Tuple emitted by task that's not part of this component. Should be impossible");
-        }
-        return ret;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java b/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java
deleted file mode 100644
index 69c36ac..0000000
--- a/jstorm-client/src/main/java/storm/trident/partition/IndexHashGrouping.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package storm.trident.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.Arrays;
-import java.util.List;
-
-public class IndexHashGrouping implements CustomStreamGrouping {
-    public static int objectToIndex(Object val, int numPartitions) {
-        if(val==null) return 0;
-        else {
-            return Math.abs(val.hashCode() % numPartitions);
-        }
-    }
-    
-    int _index;
-    List<Integer> _targets;
-    
-    public IndexHashGrouping(int index) {
-        _index = index;
-    }
-    
-    
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        _targets = targetTasks;
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int fromTask, List<Object> values) {
-        int i = objectToIndex(values.get(_index), _targets.size());
-        return Arrays.asList(_targets.get(i));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java b/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java
deleted file mode 100644
index b596d54..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/BridgeReceiver.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import storm.trident.tuple.ConsList;
-import storm.trident.tuple.TridentTuple;
-
-
-public class BridgeReceiver implements TupleReceiver {
-
-    BatchOutputCollector _collector;
-    
-    public BridgeReceiver(BatchOutputCollector collector) {
-        _collector = collector;
-    }
-    
-    @Override
-    public void execute(ProcessorContext context, String streamId, TridentTuple tuple) {
-        _collector.emit(streamId, new ConsList(context.batchId, tuple));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/Node.java b/jstorm-client/src/main/java/storm/trident/planner/Node.java
deleted file mode 100644
index 1a0e29d..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/Node.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.tuple.Fields;
-
-import java.io.Serializable;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-public class Node implements Serializable {
-	private static AtomicInteger INDEX = new AtomicInteger(0);
-
-	private String nodeId;
-
-	public String name = null;
-	public Fields allOutputFields;
-	public String streamId;
-	public Integer parallelismHint = null;
-	public NodeStateInfo stateInfo = null;
-	public int creationIndex;
-
-	public Node(String streamId, String name, Fields allOutputFields) {
-		this.nodeId = UUID.randomUUID().toString();
-		this.allOutputFields = allOutputFields;
-		this.streamId = streamId;
-		this.name = name;
-		this.creationIndex = INDEX.incrementAndGet();
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		Node other = (Node) obj;
-		if (nodeId == null) {
-			if (other.nodeId != null)
-				return false;
-		} else if (!nodeId.equals(other.nodeId))
-			return false;
-		return true;
-	}
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.MULTI_LINE_STYLE);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java b/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java
deleted file mode 100644
index a045eef..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/NodeStateInfo.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.planner;
-
-import java.io.Serializable;
-import storm.trident.state.StateSpec;
-
-public class NodeStateInfo implements Serializable {
-    public String id;
-    public StateSpec spec;
-    
-    public NodeStateInfo(String id, StateSpec spec) {
-        this.id = id;
-        this.spec = spec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java b/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java
deleted file mode 100644
index fdde133..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/PartitionNode.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.generated.Grouping;
-import backtype.storm.tuple.Fields;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import storm.trident.util.TridentUtils;
-
-
-public class PartitionNode extends Node {
-    public transient Grouping thriftGrouping;
-    
-    //has the streamid/outputFields of the node it's doing the partitioning on
-    public PartitionNode(String streamId, String name, Fields allOutputFields, Grouping grouping) {
-        super(streamId, name, allOutputFields);
-        this.thriftGrouping = grouping;
-    }
-    
-    private void writeObject(ObjectOutputStream oos) throws IOException {
-        oos.defaultWriteObject();
-        byte[] ser = TridentUtils.thriftSerialize(thriftGrouping);
-        oos.writeInt(ser.length);
-        oos.write(ser);
-    }
-
-    private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-        ois.defaultReadObject();
-        byte[] ser = new byte[ois.readInt()];
-        ois.readFully(ser);
-        this.thriftGrouping = TridentUtils.thriftDeserialize(Grouping.class, ser);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java b/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java
deleted file mode 100644
index dc8bb6a..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/ProcessorContext.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.planner;
-
-
-public class ProcessorContext {
-    public Object batchId;
-    public Object[] state;
-    
-    public ProcessorContext(Object batchId, Object[] state) {
-        this.batchId = batchId;
-        this.state = state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java b/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java
deleted file mode 100644
index c0e09aa..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/ProcessorNode.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.tuple.Fields;
-
-public class ProcessorNode extends Node {
-    
-    public boolean committer; // for partitionpersist
-    public TridentProcessor processor;
-    public Fields selfOutFields;
-    
-    public ProcessorNode(String streamId, String name, Fields allOutputFields, Fields selfOutFields, TridentProcessor processor) {
-        super(streamId, name, allOutputFields);
-        this.processor = processor;
-        this.selfOutFields = selfOutFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java b/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java
deleted file mode 100644
index 1432c43..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/SpoutNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.tuple.Fields;
-
-
-public class SpoutNode extends Node {
-    public static enum SpoutType {
-        DRPC,
-        BATCH
-    }
-    
-    public Object spout;
-    public String txId; //where state is stored in zookeeper (only for batch spout types)
-    public SpoutType type;
-    
-    public SpoutNode(String streamId, Fields allOutputFields, String txid, Object spout, SpoutType type) {
-        super(streamId, null, allOutputFields);
-        this.txId = txid;
-        this.spout = spout;
-        this.type = type;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java b/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java
deleted file mode 100644
index 596c15d..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/SubtopologyBolt.java
+++ /dev/null
@@ -1,201 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.jgrapht.DirectedGraph;
-import org.jgrapht.graph.DirectedSubgraph;
-import org.jgrapht.traverse.TopologicalOrderIterator;
-import storm.trident.planner.processor.TridentContext;
-import storm.trident.state.State;
-import storm.trident.topology.BatchInfo;
-import storm.trident.topology.ITridentBatchBolt;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-import storm.trident.tuple.TridentTupleView.RootFactory;
-import storm.trident.util.TridentUtils;
-
-// TODO: parameterizing it like this with everything might be a high deserialization cost if there's lots of tasks?
-// TODO: memory problems?
-// TODO: can avoid these problems by adding a boltfactory abstraction, so that boltfactory is deserialized once
-//   bolt factory -> returns coordinatedbolt per task, but deserializes the batch bolt one time and caches
-public class SubtopologyBolt implements ITridentBatchBolt {
-    DirectedGraph _graph;
-    Set<Node> _nodes;
-    Map<String, InitialReceiver> _roots = new HashMap();
-    Map<Node, Factory> _outputFactories = new HashMap();
-    Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap();
-    Map<Node, String> _batchGroups;
-    
-    //given processornodes and static state nodes
-    public SubtopologyBolt(DirectedGraph graph, Set<Node> nodes, Map<Node, String> batchGroups) {
-        _nodes = nodes;
-        _graph = graph;
-        _batchGroups = batchGroups;
-    }
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector batchCollector) {
-        int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
-        for(Node n: _nodes) {
-            if(n.stateInfo!=null) {
-                State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
-                context.setTaskData(n.stateInfo.id, s);
-            }
-        }
-        DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, _nodes, null);
-        TopologicalOrderIterator it = new TopologicalOrderIterator<Node, Object>(subgraph);
-        int stateIndex = 0;
-        while(it.hasNext()) {
-            Node n = (Node) it.next();
-            if(n instanceof ProcessorNode) {
-                ProcessorNode pn = (ProcessorNode) n;
-                String batchGroup = _batchGroups.get(n);
-                if(!_myTopologicallyOrdered.containsKey(batchGroup)) {
-                    _myTopologicallyOrdered.put(batchGroup, new ArrayList());
-                }
-                _myTopologicallyOrdered.get(batchGroup).add(pn.processor);
-                List<String> parentStreams = new ArrayList();
-                List<Factory> parentFactories = new ArrayList();
-                for(Node p: TridentUtils.getParents(_graph, n)) {
-                    parentStreams.add(p.streamId);
-                    if(_nodes.contains(p)) {
-                        parentFactories.add(_outputFactories.get(p));
-                    } else {
-                        if(!_roots.containsKey(p.streamId)) {
-                            _roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId)));
-                        } 
-                        _roots.get(p.streamId).addReceiver(pn.processor);
-                        parentFactories.add(_roots.get(p.streamId).getOutputFactory());
-                    }
-                }
-                List<TupleReceiver> targets = new ArrayList();
-                boolean outgoingNode = false;
-                for(Node cn: TridentUtils.getChildren(_graph, n)) {
-                    if(_nodes.contains(cn)) {
-                        targets.add(((ProcessorNode) cn).processor);
-                    } else {
-                        outgoingNode = true;
-                    }
-                }
-                if(outgoingNode) {
-                    targets.add(new BridgeReceiver(batchCollector));
-                }
-                
-                TridentContext triContext = new TridentContext(
-                        pn.selfOutFields,
-                        parentFactories,
-                        parentStreams,
-                        targets,
-                        pn.streamId,
-                        stateIndex,
-                        batchCollector
-                        );
-                pn.processor.prepare(conf, context, triContext);
-                _outputFactories.put(n, pn.processor.getOutputFactory());
-            }   
-            stateIndex++;
-        }        
-        // TODO: get prepared one time into executor data... need to avoid the ser/deser
-        // for each task (probably need storm to support boltfactory)
-    }
-
-    private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
-        for(GlobalStreamId g: context.getThisSources().keySet()) {
-            if(g.get_streamId().equals(sourceStream)) {
-                return context.getComponentOutputFields(g);
-            }
-        }
-        throw new RuntimeException("Could not find fields for source stream " + sourceStream);
-    }
-    
-    @Override
-    public void execute(BatchInfo batchInfo, Tuple tuple) {
-        String sourceStream = tuple.getSourceStreamId();
-        InitialReceiver ir = _roots.get(sourceStream);
-        if(ir==null) {
-            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
-        }
-        ir.receive((ProcessorContext) batchInfo.state, tuple);
-    }
-
-    @Override
-    public void finishBatch(BatchInfo batchInfo) {
-        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
-            p.finishBatch((ProcessorContext) batchInfo.state);
-        }
-    }
-
-    @Override
-    public Object initBatchState(String batchGroup, Object batchId) {
-        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
-        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
-            p.startBatch(ret);
-        }
-        return ret;
-    }
-
-    @Override
-    public void cleanup() {
-        for(String bg: _myTopologicallyOrdered.keySet()) {
-            for(TridentProcessor p: _myTopologicallyOrdered.get(bg)) {
-                p.cleanup();
-            }   
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(Node n: _nodes) {
-            declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
-        }        
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-    
-    
-    protected class InitialReceiver {
-        List<TridentProcessor> _receivers = new ArrayList();
-        RootFactory _factory;
-        ProjectionFactory _project;
-        String _stream;
-        
-        public InitialReceiver(String stream, Fields allFields) {
-            // TODO: don't want to project for non-batch bolts...???
-            // how to distinguish "batch" streams from non-batch streams?
-            _stream = stream;
-            _factory = new RootFactory(allFields);
-            List<String> projected = new ArrayList(allFields.toList());
-            projected.remove(0);
-            _project = new ProjectionFactory(_factory, new Fields(projected));
-        }
-        
-        public void receive(ProcessorContext context, Tuple tuple) {
-            TridentTuple t = _project.create(_factory.create(tuple));
-            for(TridentProcessor r: _receivers) {
-                r.execute(context, _stream, t);
-            }            
-        }
-        
-        public void addReceiver(TridentProcessor p) {
-            _receivers.add(p);
-        }
-        
-        public Factory getOutputFactory() {
-            return _project;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java
deleted file mode 100644
index 866d058..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/TridentProcessor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package storm.trident.planner;
-
-import backtype.storm.task.TopologyContext;
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.planner.processor.TridentContext;
-import storm.trident.tuple.TridentTuple.Factory;
-
-public interface TridentProcessor extends Serializable, TupleReceiver {
-    
-    // imperative that don't emit any tuples from here, since output factory cannot be gotten until
-    // preparation is done, therefore, receivers won't be ready to receive tuples yet
-    // can't emit tuples from here anyway, since it's not within a batch context (which is only
-    // startBatch, execute, and finishBatch
-    void prepare(Map conf, TopologyContext context, TridentContext tridentContext);
-    void cleanup();
-    
-    void startBatch(ProcessorContext processorContext);
-    
-    void finishBatch(ProcessorContext processorContext);
-    
-    Factory getOutputFactory();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java b/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java
deleted file mode 100644
index a2fc148..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/TupleReceiver.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package storm.trident.planner;
-
-import storm.trident.tuple.TridentTuple;
-
-
-public interface TupleReceiver {
-    //streaId indicates where tuple came from
-    void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple);
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java
deleted file mode 100644
index ce62790..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/AggregateProcessor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class AggregateProcessor implements TridentProcessor {
-    Aggregator _agg;
-    TridentContext _context;
-    FreshCollector _collector;
-    Fields _inputFields;
-    ProjectionFactory _projection;
-
-    public AggregateProcessor(Fields inputFields, Aggregator agg) {
-        _agg = agg;
-        _inputFields = inputFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
-        List<Factory> parents = tridentContext.getParentTupleFactories();
-        if(parents.size()!=1) {
-            throw new RuntimeException("Aggregate operation can only have one parent");
-        }
-        _context = tridentContext;
-        _collector = new FreshCollector(tridentContext);
-        _projection = new ProjectionFactory(parents.get(0), _inputFields);
-        _agg.prepare(conf, new TridentOperationContext(context, _projection));
-    }
-
-    @Override
-    public void cleanup() {
-        _agg.cleanup();
-    }
-
-    @Override
-    public void startBatch(ProcessorContext processorContext) {
-        _collector.setContext(processorContext);
-        processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
-    }    
-
-    @Override
-    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
-        _collector.setContext(processorContext);
-        _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
-    }
-    
-    @Override
-    public void finishBatch(ProcessorContext processorContext) {
-        _collector.setContext(processorContext);
-        _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
-    }
- 
-    @Override
-    public Factory getOutputFactory() {
-        return _collector.getOutputFactory();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java b/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java
deleted file mode 100644
index 92932cb..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/AppendCollector.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package storm.trident.planner.processor;
-
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TupleReceiver;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView;
-import storm.trident.tuple.TridentTupleView.OperationOutputFactory;
-
-
-public class AppendCollector implements TridentCollector {
-    OperationOutputFactory _factory;
-    TridentContext _triContext;
-    TridentTuple tuple;
-    ProcessorContext context;
-    
-    public AppendCollector(TridentContext context) {
-        _triContext = context;
-        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
-    }
-                
-    public void setContext(ProcessorContext pc, TridentTuple t) {
-        this.context = pc;
-        this.tuple = t;
-    }
-
-    @Override
-    public void emit(List<Object> values) {
-        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
-        for(TupleReceiver r: _triContext.getReceivers()) {
-            r.execute(context, _triContext.getOutStreamId(), toEmit);
-        }
-    }
-
-    @Override
-    public void reportError(Throwable t) {
-        _triContext.getDelegateCollector().reportError(t);
-    } 
-    
-    public Factory getOutputFactory() {
-        return _factory;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java
deleted file mode 100644
index 7b217de..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/EachProcessor.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.Function;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class EachProcessor implements TridentProcessor {
-    Function _function;
-    TridentContext _context;
-    AppendCollector _collector;
-    Fields _inputFields;
-    ProjectionFactory _projection;
-    
-    public EachProcessor(Fields inputFields, Function function) {
-        _function = function;
-        _inputFields = inputFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
-        List<Factory> parents = tridentContext.getParentTupleFactories();
-        if(parents.size()!=1) {
-            throw new RuntimeException("Each operation can only have one parent");
-        }
-        _context = tridentContext;
-        _collector = new AppendCollector(tridentContext);
-        _projection = new ProjectionFactory(parents.get(0), _inputFields);
-        _function.prepare(conf, new TridentOperationContext(context, _projection));
-    }
-
-    @Override
-    public void cleanup() {
-        _function.cleanup();
-    }    
-
-    @Override
-    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
-        _collector.setContext(processorContext, tuple);
-        _function.execute(_projection.create(tuple), _collector);
-    }
-
-    @Override
-    public void startBatch(ProcessorContext processorContext) {
-    }
-
-    @Override
-    public void finishBatch(ProcessorContext processorContext) {
-    }
-
-    @Override
-    public Factory getOutputFactory() {
-        return _collector.getOutputFactory();
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java b/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java
deleted file mode 100644
index 1fb3aa6..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/FreshCollector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package storm.trident.planner.processor;
-
-import java.util.List;
-import storm.trident.operation.TridentCollector;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TupleReceiver;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.FreshOutputFactory;
-
-
-public class FreshCollector implements TridentCollector {
-    FreshOutputFactory _factory;
-    TridentContext _triContext;
-    ProcessorContext context;
-    
-    public FreshCollector(TridentContext context) {
-        _triContext = context;
-        _factory = new FreshOutputFactory(context.getSelfOutputFields());
-    }
-                
-    public void setContext(ProcessorContext pc) {
-        this.context = pc;
-    }
-
-    @Override
-    public void emit(List<Object> values) {
-        TridentTuple toEmit = _factory.create(values);
-        for(TupleReceiver r: _triContext.getReceivers()) {
-            r.execute(context, _triContext.getOutStreamId(), toEmit);
-        }            
-    }
-
-    @Override
-    public void reportError(Throwable t) {
-        _triContext.getDelegateCollector().reportError(t);
-    } 
-
-    public Factory getOutputFactory() {
-        return _factory;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java
deleted file mode 100644
index 1998e1a..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/MultiReducerProcessor.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class MultiReducerProcessor implements TridentProcessor {
-    MultiReducer _reducer;
-    TridentContext _context;
-    Map<String, Integer> _streamToIndex;
-    List<Fields> _projectFields;
-    ProjectionFactory[] _projectionFactories;
-    FreshCollector _collector;
-
-    public MultiReducerProcessor(List<Fields> inputFields, MultiReducer reducer) {
-        _reducer = reducer;
-        _projectFields = inputFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
-        List<Factory> parents = tridentContext.getParentTupleFactories();
-        _context = tridentContext;
-        _streamToIndex = new HashMap<String, Integer>();
-        List<String> parentStreams = tridentContext.getParentStreams();
-        for(int i=0; i<parentStreams.size(); i++) {
-            _streamToIndex.put(parentStreams.get(i), i);
-        }
-        _projectionFactories = new ProjectionFactory[_projectFields.size()];
-        for(int i=0; i<_projectFields.size(); i++) {
-            _projectionFactories[i] = new ProjectionFactory(parents.get(i), _projectFields.get(i));
-        }
-        _collector = new FreshCollector(tridentContext);
-        _reducer.prepare(conf, new TridentMultiReducerContext((List) Arrays.asList(_projectionFactories)));
-    }
-
-    @Override
-    public void cleanup() {
-        _reducer.cleanup();
-    }
-
-    @Override
-    public void startBatch(ProcessorContext processorContext) {
-        _collector.setContext(processorContext);
-        processorContext.state[_context.getStateIndex()] = _reducer.init(_collector);
-    }    
-
-    @Override
-    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
-        _collector.setContext(processorContext);
-        int i = _streamToIndex.get(streamId);
-        _reducer.execute(processorContext.state[_context.getStateIndex()], i, _projectionFactories[i].create(tuple), _collector);
-    }
-    
-    @Override
-    public void finishBatch(ProcessorContext processorContext) {
-        _collector.setContext(processorContext);
-        _reducer.complete(processorContext.state[_context.getStateIndex()], _collector);
-    }
- 
-    @Override
-    public Factory getOutputFactory() {
-        return _collector.getOutputFactory();
-    } 
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java
deleted file mode 100644
index 5ab2357..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/PartitionPersistProcessor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import storm.trident.topology.TransactionAttempt;
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.state.State;
-import storm.trident.state.StateUpdater;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class PartitionPersistProcessor implements TridentProcessor {
-    StateUpdater _updater;
-    State _state;
-    String _stateId;
-    TridentContext _context;
-    Fields _inputFields;
-    ProjectionFactory _projection;
-    FreshCollector _collector;
-
-    public PartitionPersistProcessor(String stateId, Fields inputFields, StateUpdater updater) {
-        _updater = updater;
-        _stateId = stateId;
-        _inputFields = inputFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
-        List<Factory> parents = tridentContext.getParentTupleFactories();
-        if(parents.size()!=1) {
-            throw new RuntimeException("Partition persist operation can only have one parent");
-        }
-        _context = tridentContext;
-        _state = (State) context.getTaskData(_stateId);
-        _projection = new ProjectionFactory(parents.get(0), _inputFields);
-        _collector = new FreshCollector(tridentContext);
-        _updater.prepare(conf, new TridentOperationContext(context, _projection));
-    }
-
-    @Override
-    public void cleanup() {
-        _updater.cleanup();
-    }
-
-    @Override
-    public void startBatch(ProcessorContext processorContext) {
-        processorContext.state[_context.getStateIndex()] = new ArrayList<TridentTuple>();        
-    }
-    
-    @Override
-    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
-        ((List) processorContext.state[_context.getStateIndex()]).add(_projection.create(tuple));
-    }
-
-    @Override
-    public void finishBatch(ProcessorContext processorContext) {
-        _collector.setContext(processorContext);
-        Object batchId = processorContext.batchId;
-        // since this processor type is a committer, this occurs in the commit phase
-        List<TridentTuple> buffer = (List) processorContext.state[_context.getStateIndex()];
-        
-        // don't update unless there are tuples
-        // this helps out with things like global partition persist, where multiple tasks may still
-        // exist for this processor. Only want the global one to do anything
-        // this is also a helpful optimization that state implementations don't need to manually do
-        if(buffer.size() > 0) {
-            Long txid = null;
-            // this is to support things like persisting off of drpc stream, which is inherently unreliable
-            // and won't have a tx attempt
-            if(batchId instanceof TransactionAttempt) {
-                txid = ((TransactionAttempt) batchId).getTransactionId();
-            }
-            _state.beginCommit(txid);
-            _updater.updateState(_state, buffer, _collector);
-            _state.commit(txid);
-        }
-    }    
-
-    @Override
-    public Factory getOutputFactory() {
-        return _collector.getOutputFactory();
-    } 
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java b/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java
deleted file mode 100644
index c6d34e5..0000000
--- a/jstorm-client/src/main/java/storm/trident/planner/processor/ProjectedProcessor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package storm.trident.planner.processor;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import storm.trident.planner.ProcessorContext;
-import storm.trident.planner.TridentProcessor;
-import storm.trident.planner.TupleReceiver;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTuple.Factory;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class ProjectedProcessor implements TridentProcessor {
-    Fields _projectFields;
-    ProjectionFactory _factory;
-    TridentContext _context;
-    
-    public ProjectedProcessor(Fields projectFields) {
-        _projectFields = projectFields;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
-        if(tridentContext.getParentTupleFactories().size()!=1) {
-            throw new RuntimeException("Projection processor can only have one parent");
-        }
-        _context = tridentContext;
-        _factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);
-    }
-
-    @Override
-    public void cleanup() {
-    }
-
-    @Override
-    public void startBatch(ProcessorContext processorContext) {
-    }
-
-    @Override
-    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
-        TridentTuple toEmit = _factory.create(tuple);
-        for(TupleReceiver r: _context.getReceivers()) {
-            r.execute(processorContext, _context.getOutStreamId(), toEmit);
-        }
-    }
-
-    @Override
-    public void finishBatch(ProcessorContext processorContext) {
-    }
-
-    @Override
-    public Factory getOutputFactory() {
-        return _factory;
-    }
-}