You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:57 UTC

[18/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java b/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java
deleted file mode 100644
index f7c227b..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/MapReducerAggStateUpdater.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package storm.trident.state.map;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentOperationContext;
-import storm.trident.state.ReducerValueUpdater;
-import storm.trident.state.StateUpdater;
-import storm.trident.state.ValueUpdater;
-import storm.trident.tuple.ComboList;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class MapReducerAggStateUpdater implements StateUpdater<MapState> {
-    ReducerAggregator _agg;
-    Fields _groupFields;
-    Fields _inputFields;
-    ProjectionFactory _groupFactory;
-    ProjectionFactory _inputFactory;
-    ComboList.Factory _factory;
-    
-    
-    public MapReducerAggStateUpdater(ReducerAggregator agg, Fields groupFields, Fields inputFields) {
-        _agg = agg;
-        _groupFields = groupFields;
-        _inputFields = inputFields;
-        _factory = new ComboList.Factory(groupFields.size(), 1);
-    }
-    
-
-    @Override
-    public void updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector) {
-        Map<List<Object>, List<TridentTuple>> grouped = new HashMap();
-        
-        //List<List<Object>> groups = new ArrayList<List<Object>>(tuples.size());
-        //List<Object> values = new ArrayList<Object>(tuples.size());
-        for(TridentTuple t: tuples) {
-            List<Object> group = _groupFactory.create(t);
-            List<TridentTuple> groupTuples = grouped.get(group);
-            if(groupTuples==null) {
-                groupTuples = new ArrayList();
-                grouped.put(group, groupTuples);
-            }
-            groupTuples.add(_inputFactory.create(t));
-        }
-        List<List<Object>> uniqueGroups = new ArrayList(grouped.keySet());
-        List<ValueUpdater> updaters = new ArrayList(uniqueGroups.size());
-        for(List<Object> group: uniqueGroups) {
-            updaters.add(new ReducerValueUpdater(_agg, grouped.get(group)));
-        }
-        List<Object> results = map.multiUpdate(uniqueGroups, updaters);
-
-        for(int i=0; i<uniqueGroups.size(); i++) {
-            List<Object> group = uniqueGroups.get(i);
-            Object result = results.get(i);
-            collector.emit(_factory.create(new List[] {group, new Values(result) }));
-        }
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        _groupFactory = context.makeProjectionFactory(_groupFields);
-        _inputFactory = context.makeProjectionFactory(_inputFields);
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/MapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/MapState.java b/jstorm-client/src/main/java/storm/trident/state/map/MapState.java
deleted file mode 100644
index 78901d9..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/MapState.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.List;
-import storm.trident.state.ValueUpdater;
-
-public interface MapState<T> extends ReadOnlyMapState<T> {
-    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
-    void multiPut(List<List<Object>> keys, List<T> vals);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java b/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java
deleted file mode 100644
index 2f356b1..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/MicroBatchIBackingMap.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package storm.trident.state.map;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class MicroBatchIBackingMap<T> implements IBackingMap<T> {
-    IBackingMap<T> _delegate;
-    Options _options;
-
-
-    public static class Options implements Serializable {
-        public int maxMultiGetBatchSize = 0; // 0 means delegate batch size = trident batch size.
-        public int maxMultiPutBatchSize = 0;
-    }
-
-    public MicroBatchIBackingMap(final Options options, final IBackingMap<T> delegate) {
-        _options = options;
-        _delegate = delegate;
-        assert options.maxMultiPutBatchSize >= 0;
-        assert options.maxMultiGetBatchSize >= 0;
-    }
-
-    @Override
-    public void multiPut(final List<List<Object>> keys, final List<T> values) {
-        int thisBatchSize;
-        if(_options.maxMultiPutBatchSize == 0) { thisBatchSize = keys.size(); }
-        else { thisBatchSize = _options.maxMultiPutBatchSize; }
-
-        LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
-        LinkedList<T> valuesTodo = new LinkedList<T>(values);
-
-        while(!keysTodo.isEmpty()) {
-            List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
-            List<T> valuesBatch = new ArrayList<T>(thisBatchSize);
-            for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
-                keysBatch.add(keysTodo.removeFirst());
-                valuesBatch.add(valuesTodo.removeFirst());
-            }
-
-            _delegate.multiPut(keysBatch, valuesBatch);
-        }
-    }
-
-    @Override
-    public List<T> multiGet(final List<List<Object>> keys) {
-        int thisBatchSize;
-        if(_options.maxMultiGetBatchSize == 0) { thisBatchSize = keys.size(); }
-        else { thisBatchSize = _options.maxMultiGetBatchSize; }
-
-        LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
-
-        List<T> ret = new ArrayList<T>(keys.size());
-
-        while(!keysTodo.isEmpty()) {
-            List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
-            for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
-                keysBatch.add(keysTodo.removeFirst());
-            }
-
-            List<T> retSubset = _delegate.multiGet(keysBatch);
-            ret.addAll(retSubset);
-        }
-
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java b/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java
deleted file mode 100644
index 3a140b5..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/NonTransactionalMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package storm.trident.state.map;
-
-import storm.trident.state.ValueUpdater;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class NonTransactionalMap<T> implements MapState<T> {
-    public static <T> MapState<T> build(IBackingMap<T> backing) {
-        return new NonTransactionalMap<T>(backing);
-    }
-    
-    IBackingMap<T> _backing;
-    
-    protected NonTransactionalMap(IBackingMap<T> backing) {
-        _backing = backing;
-    }
-    
-    @Override
-    public List<T> multiGet(List<List<Object>> keys) {
-        return _backing.multiGet(keys);
-    }
-
-    @Override
-    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
-        List<T> curr = _backing.multiGet(keys);
-        List<T> ret = new ArrayList<T>(curr.size());
-        for(int i=0; i<curr.size(); i++) {
-            T currVal = curr.get(i);
-            ValueUpdater<T> updater = updaters.get(i);
-            ret.add(updater.update(currVal));
-        }
-        _backing.multiPut(keys, ret);
-        return ret;
-    }
-
-    @Override
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        _backing.multiPut(keys, vals);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-    }
-
-    @Override
-    public void commit(Long txid) {
-    }  
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java b/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java
deleted file mode 100644
index f646d66..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/OpaqueMap.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package storm.trident.state.map;
-
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.ValueUpdater;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class OpaqueMap<T> implements MapState<T> {
-    public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {
-        return new OpaqueMap<T>(backing);
-    }
-    
-    CachedBatchReadsMap<OpaqueValue> _backing;
-    Long _currTx;
-    
-    protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
-        _backing = new CachedBatchReadsMap(backing);
-    }
-    
-    @Override
-    public List<T> multiGet(List<List<Object>> keys) {
-        List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
-        List<T> ret = new ArrayList<T>(curr.size());
-        for(CachedBatchReadsMap.RetVal<OpaqueValue> retval: curr) {
-            OpaqueValue val = retval.val;
-            if(val!=null) {
-                if(retval.cached) {
-                    ret.add((T) val.getCurr());
-                } else {
-                    ret.add((T) val.get(_currTx));
-                }
-            } else {
-                ret.add(null);
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
-        List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
-        List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size());
-        List<T> ret = new ArrayList<T>();
-        for(int i=0; i<curr.size(); i++) {
-            CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i);
-            OpaqueValue<T> val = retval.val;
-            ValueUpdater<T> updater = updaters.get(i);
-            T prev;
-            if(val==null) {
-                prev = null;
-            } else {
-                if(retval.cached) {
-                    prev = val.getCurr();
-                } else {
-                    prev = val.get(_currTx);
-                }
-            }
-            T newVal = updater.update(prev);
-            ret.add(newVal);
-            OpaqueValue<T> newOpaqueVal;
-            if(val==null) {
-                newOpaqueVal = new OpaqueValue<T>(_currTx, newVal);
-            } else {
-                newOpaqueVal = val.update(_currTx, newVal);
-            }
-            newVals.add(newOpaqueVal);
-        }
-        _backing.multiPut(keys, newVals);
-        return ret;
-    }
-
-    @Override
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size());
-        for(T val: vals) {
-            updaters.add(new ReplaceUpdater<T>(val));
-        }
-        multiUpdate(keys, updaters);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        _currTx = txid;
-        _backing.reset();
-    }
-
-    @Override
-    public void commit(Long txid) {
-        _currTx = null;
-        _backing.reset();
-    }
-    
-    static class ReplaceUpdater<T> implements ValueUpdater<T> {
-        T _t;
-        
-        public ReplaceUpdater(T t) {
-            _t = t;
-        }
-        
-        @Override
-        public T update(Object stored) {
-            return _t;
-        }        
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java b/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java
deleted file mode 100644
index 5a519c4..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/ReadOnlyMapState.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.List;
-import storm.trident.state.State;
-
-public interface ReadOnlyMapState<T> extends State {
-    // certain states might only accept one-tuple keys - those should just throw an error 
-    List<T> multiGet(List<List<Object>> keys);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java b/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java
deleted file mode 100644
index cf34f05..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/RemovableMapState.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.List;
-import storm.trident.state.State;
-
-public interface RemovableMapState<T> extends State {
-    void multiRemove(List<List<Object>> keys);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java b/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java
deleted file mode 100644
index f42a5c9..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/SnapshottableMap.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package storm.trident.state.map;
-
-import java.util.Arrays;
-import java.util.List;
-import storm.trident.state.ValueUpdater;
-import storm.trident.state.snapshot.Snapshottable;
-
-
-public class SnapshottableMap<T> implements MapState<T>, Snapshottable<T> {
-    MapState<T> _delegate;
-    List<List<Object>> _keys;
-    
-    public SnapshottableMap(MapState<T> delegate, List<Object> snapshotKey) {
-        _delegate = delegate;
-        _keys = Arrays.asList(snapshotKey);
-    }
-    
-    @Override
-    public List<T> multiGet(List<List<Object>> keys) {
-        return _delegate.multiGet(keys);
-    }
-
-    @Override
-    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
-        return _delegate.multiUpdate(keys, updaters);
-    }
-
-    @Override
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        _delegate.multiPut(keys, vals);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        _delegate.beginCommit(txid);
-    }
-
-    @Override
-    public void commit(Long txid) {
-        _delegate.commit(txid);
-    }
-
-    @Override
-    public T get() {
-        return multiGet(_keys).get(0);
-    }
-
-    @Override
-    public T update(ValueUpdater updater) {
-        List<ValueUpdater> updaters = Arrays.asList(updater);
-        return multiUpdate(_keys, updaters).get(0);
-    }
-
-    @Override
-    public void set(T o) {
-        multiPut(_keys, Arrays.asList(o));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java b/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java
deleted file mode 100644
index 1f44910..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/map/TransactionalMap.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package storm.trident.state.map;
-
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.ValueUpdater;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class TransactionalMap<T> implements MapState<T> {
-    public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
-        return new TransactionalMap<T>(backing);
-    }
-
-    CachedBatchReadsMap<TransactionalValue> _backing;
-    Long _currTx;
-    
-    protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
-        _backing = new CachedBatchReadsMap(backing);
-    }
-    
-    @Override
-    public List<T> multiGet(List<List<Object>> keys) {
-        List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);
-        List<T> ret = new ArrayList<T>(vals.size());
-        for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {
-            TransactionalValue v = retval.val;
-            if(v!=null) {
-                ret.add((T) v.getVal());
-            } else {
-                ret.add(null);
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
-        List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
-        List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
-        List<List<Object>> newKeys = new ArrayList();
-        List<T> ret = new ArrayList<T>();
-        for(int i=0; i<curr.size(); i++) {
-            CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
-            TransactionalValue<T> val = retval.val;
-            ValueUpdater<T> updater = updaters.get(i);
-            TransactionalValue<T> newVal;
-            boolean changed = false;
-            if(val==null) {
-                newVal = new TransactionalValue<T>(_currTx, updater.update(null));
-                changed = true;
-            } else {
-                if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {
-                    newVal = val;
-                } else {
-                    newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
-                    changed = true;
-                }
-            }
-            ret.add(newVal.getVal());
-            if(changed) {
-                newVals.add(newVal);
-                newKeys.add(keys.get(i));
-            }
-        }
-        if(!newKeys.isEmpty()) {
-            _backing.multiPut(newKeys, newVals);
-        }
-        return ret;
-    }
-
-    @Override
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());
-        for(T val: vals) {
-            newVals.add(new TransactionalValue<T>(_currTx, val));
-        }
-        _backing.multiPut(keys, newVals);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        _currTx = txid;
-        _backing.reset();
-    }
-
-    @Override
-    public void commit(Long txid) {
-        _currTx = null;
-        _backing.reset();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java b/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java
deleted file mode 100644
index 2064a98..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/snapshot/ReadOnlySnapshottable.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.trident.state.snapshot;
-
-import storm.trident.state.State;
-
-public interface ReadOnlySnapshottable<T> extends State {
-    T get();    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java b/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java
deleted file mode 100644
index f216485..0000000
--- a/jstorm-client/src/main/java/storm/trident/state/snapshot/Snapshottable.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package storm.trident.state.snapshot;
-
-import storm.trident.state.ValueUpdater;
-
-
-// used by Stream#persistentAggregate
-public interface Snapshottable<T> extends ReadOnlySnapshottable<T> {
-    T update(ValueUpdater updater);
-    void set(T o);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java b/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java
deleted file mode 100644
index 52f482f..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/CountAsAggregator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.BaseAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-
-public class CountAsAggregator extends BaseAggregator<CountAsAggregator.State> {
-
-    static class State {
-        long count = 0;
-    }
-    
-    @Override
-    public State init(Object batchId, TridentCollector collector) {
-        return new State();
-    }
-
-    @Override
-    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
-        state.count++;
-    }
-
-    @Override
-    public void complete(State state, TridentCollector collector) {
-        collector.emit(new Values(state.count));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java b/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java
deleted file mode 100644
index 5571153..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/FeederBatchSpout.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.RegisteredGlobalState;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.topology.TransactionAttempt;
-import storm.trident.topology.TridentTopologyBuilder;
-
-public class FeederBatchSpout implements ITridentSpout, IFeeder {
-
-    String _id;
-    String _semaphoreId;
-    Fields _outFields;
-    boolean _waitToEmit = true;
-    
-
-    public FeederBatchSpout(List<String> fields) {
-        _outFields = new Fields(fields);
-        _id = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
-        _semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
-    }
-    
-    public void setWaitToEmit(boolean trueIfWait) {
-        _waitToEmit = trueIfWait;
-    }
-    
-    public void feed(Object tuples) {
-        Semaphore sem = new Semaphore(0);
-        ((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
-        ((List)RegisteredGlobalState.getState(_id)).add(tuples);
-        try {
-            sem.acquire();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-
-    public class FeederCoordinator implements ITridentSpout.BatchCoordinator<Map<Integer, List<List<Object>>>> {
-
-        int _numPartitions;
-        int _emittedIndex = 0;
-        Map<Long, Integer> txIndices = new HashMap();
-        
-        public FeederCoordinator(int numPartitions) {
-            _numPartitions = numPartitions;
-        }
-        
-        @Override
-        public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
-            if(currMetadata!=null) return currMetadata;
-            List allBatches = (List) RegisteredGlobalState.getState(_id);
-            if(allBatches.size()>_emittedIndex) {
-                Object batchInfo = allBatches.get(_emittedIndex);                
-                txIndices.put(txid, _emittedIndex);                
-                _emittedIndex += 1;
-                if(batchInfo instanceof Map) {
-                    return (Map) batchInfo;
-                } else {
-                    List batchList = (List) batchInfo;
-                    Map<Integer, List<List<Object>>> partitions = new HashMap();
-                    for(int i=0; i<_numPartitions; i++) {
-                        partitions.put(i, new ArrayList());
-                    }
-                    for(int i=0; i<batchList.size(); i++) {
-                        int partition = i % _numPartitions;
-                        partitions.get(partition).add((List)batchList.get(i));
-                    }
-                    return partitions;
-                }
-            } else {
-                return new HashMap();
-            }
-        }
-
-        @Override
-        public void close() {
-        }
-
-        @Override
-        public void success(long txid) {
-            Integer index = txIndices.get(txid);
-            if(index != null) {
-                Semaphore sem = (Semaphore) ((List)RegisteredGlobalState.getState(_semaphoreId)).get(index);
-                sem.release();
-            }
-        }
-
-        int _masterEmitted = 0;
-        
-        @Override
-        public boolean isReady(long txid) {
-            if(!_waitToEmit) return true;
-            List allBatches = (List) RegisteredGlobalState.getState(_id);
-            if(allBatches.size() > _masterEmitted) {
-                _masterEmitted++;
-                return true;
-            } else {
-                Utils.sleep(2);
-                return false;
-            }
-        }
-    }
-    
-    public class FeederEmitter implements ITridentSpout.Emitter<Map<Integer, List<List<Object>>>> {
-
-        int _index;
-        
-        public FeederEmitter(int index) {
-            _index = index;
-        }
-        
-        @Override
-        public void emitBatch(TransactionAttempt tx, Map<Integer, List<List<Object>>> coordinatorMeta, TridentCollector collector) {
-            List<List<Object>> tuples = coordinatorMeta.get(_index);
-            if(tuples!=null) {
-                for(List<Object> t: tuples) {
-                    collector.emit(t);
-                }
-            }
-        }
-
-        @Override
-        public void success(TransactionAttempt tx) {
-        }
-
-        @Override
-        public void close() {
-        }        
-    }
-    
-    
-    @Override
-    public Map getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _outFields;
-    }
-
-    @Override
-    public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
-        int numTasks = context.getComponentTasks(
-                            TridentTopologyBuilder.spoutIdFromCoordinatorId(
-                                context.getThisComponentId()))
-                                    .size();
-        return new FeederCoordinator(numTasks);
-    }
-
-    @Override
-    public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
-        return new FeederEmitter(context.getThisTaskIndex());
-    }
-    
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java b/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java
deleted file mode 100644
index d105c0c..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/FeederCommitterBatchSpout.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import java.util.Map;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.ICommitterTridentSpout;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.topology.TransactionAttempt;
-
-
-public class FeederCommitterBatchSpout implements ICommitterTridentSpout, IFeeder {
-
-    FeederBatchSpout _spout;
-    
-    public FeederCommitterBatchSpout(List<String> fields) {
-        _spout = new FeederBatchSpout(fields);
-    }
-    
-    public void setWaitToEmit(boolean trueIfWait) {
-        _spout.setWaitToEmit(trueIfWait);
-    }
-    
-    static class CommitterEmitter implements ICommitterTridentSpout.Emitter {
-        ITridentSpout.Emitter _emitter;
-        
-        
-        public CommitterEmitter(ITridentSpout.Emitter e) {
-            _emitter = e;
-        }
-        
-        @Override
-        public void commit(TransactionAttempt attempt) {
-        }
-
-        @Override
-        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
-            _emitter.emitBatch(tx, coordinatorMeta, collector);
-        }
-
-        @Override
-        public void success(TransactionAttempt tx) {
-            _emitter.success(tx);
-        }
-
-        @Override
-        public void close() {
-            _emitter.close();
-        }
-        
-    }
-    
-    @Override
-    public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
-        return new CommitterEmitter(_spout.getEmitter(txStateId, conf, context));
-    }
-
-    @Override
-    public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
-        return _spout.getCoordinator(txStateId, conf, context);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _spout.getOutputFields();
-    }
-
-    @Override
-    public Map getComponentConfiguration() {
-        return _spout.getComponentConfiguration();
-    }
-
-    @Override
-    public void feed(Object tuples) {
-        _spout.feed(tuples);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java b/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java
deleted file mode 100644
index 6e32c1a..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/FixedBatchSpout.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.IBatchSpout;
-
-
-public class FixedBatchSpout implements IBatchSpout {
-
-    Fields fields;
-    List<Object>[] outputs;
-    int maxBatchSize;
-    HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
-    
-    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
-        this.fields = fields;
-        this.outputs = outputs;
-        this.maxBatchSize = maxBatchSize;
-    }
-    
-    int index = 0;
-    boolean cycle = false;
-    
-    public void setCycle(boolean cycle) {
-        this.cycle = cycle;
-    }
-    
-    @Override
-    public void open(Map conf, TopologyContext context) {
-        index = 0;
-    }
-
-    @Override
-    public void emitBatch(long batchId, TridentCollector collector) {
-        List<List<Object>> batch = this.batches.get(batchId);
-        if(batch == null){
-            batch = new ArrayList<List<Object>>();
-            if(index>=outputs.length && cycle) {
-                index = 0;
-            }
-            for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
-                batch.add(outputs[index]);
-            }
-            this.batches.put(batchId, batch);
-        }
-        for(List<Object> list : batch){
-            collector.emit(list);
-        }
-    }
-
-    @Override
-    public void ack(long batchId) {
-        this.batches.remove(batchId);
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public Map getComponentConfiguration() {
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return fields;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java b/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java
deleted file mode 100644
index eaf02bb..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/IFeeder.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.testing;
-
-
-public interface IFeeder {
-    void feed(Object tuples);        
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java b/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java
deleted file mode 100644
index 51c8ffb..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/LRUMemoryMapState.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.IMetricsContext;
-import storm.trident.state.ITupleCollection;
-import backtype.storm.tuple.Values;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.ValueUpdater;
-import storm.trident.state.map.*;
-import storm.trident.state.snapshot.Snapshottable;
-import storm.trident.util.LRUMap;
-
-public class LRUMemoryMapState<T> implements Snapshottable<T>, ITupleCollection, MapState<T> {
-
-    LRUMemoryMapStateBacking<OpaqueValue> _backing;
-    SnapshottableMap<T> _delegate;
-
-    public LRUMemoryMapState(int cacheSize, String id) {
-        _backing = new LRUMemoryMapStateBacking(cacheSize, id);
-        _delegate = new SnapshottableMap(OpaqueMap.build(_backing), new Values("$MEMORY-MAP-STATE-GLOBAL$"));
-    }
-
-    public T update(ValueUpdater updater) {
-        return _delegate.update(updater);
-    }
-
-    public void set(T o) {
-        _delegate.set(o);
-    }
-
-    public T get() {
-        return _delegate.get();
-    }
-
-    public void beginCommit(Long txid) {
-        _delegate.beginCommit(txid);
-    }
-
-    public void commit(Long txid) {
-        _delegate.commit(txid);
-    }
-
-    public Iterator<List<Object>> getTuples() {
-        return _backing.getTuples();
-    }
-
-    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
-        return _delegate.multiUpdate(keys, updaters);
-    }
-
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        _delegate.multiPut(keys, vals);
-    }
-
-    public List<T> multiGet(List<List<Object>> keys) {
-        return _delegate.multiGet(keys);
-    }
-
-    public static class Factory implements StateFactory {
-
-        String _id;
-        int _maxSize;
-
-        public Factory(int maxSize) {
-            _id = UUID.randomUUID().toString();
-            _maxSize = maxSize;
-        }
-
-        @Override
-        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-            return new LRUMemoryMapState(_maxSize, _id + partitionIndex);
-        }
-    }
-
-    static ConcurrentHashMap<String, Map<List<Object>, Object>> _dbs = new ConcurrentHashMap<String, Map<List<Object>, Object>>();
-    static class LRUMemoryMapStateBacking<T> implements IBackingMap<T>, ITupleCollection {
-
-        public static void clearAll() {
-            _dbs.clear();
-        }
-        Map<List<Object>, T> db;
-        Long currTx;
-
-        public LRUMemoryMapStateBacking(int cacheSize, String id) {
-        	_dbs.putIfAbsent(id, new LRUMap<List<Object>, Object>(cacheSize));
-            this.db = (Map<List<Object>, T>) _dbs.get(id);
-        }
-
-        @Override
-        public List<T> multiGet(List<List<Object>> keys) {
-            List<T> ret = new ArrayList();
-            for (List<Object> key : keys) {
-                ret.add(db.get(key));
-            }
-            return ret;
-        }
-
-        @Override
-        public void multiPut(List<List<Object>> keys, List<T> vals) {
-            for (int i = 0; i < keys.size(); i++) {
-                List<Object> key = keys.get(i);
-                T val = vals.get(i);
-                db.put(key, val);
-            }
-        }
-
-        @Override
-        public Iterator<List<Object>> getTuples() {
-            return new Iterator<List<Object>>() {
-
-                private Iterator<Map.Entry<List<Object>, T>> it = db.entrySet().iterator();
-
-                public boolean hasNext() {
-                    return it.hasNext();
-                }
-
-                public List<Object> next() {
-                    Map.Entry<List<Object>, T> e = it.next();
-                    List<Object> ret = new ArrayList<Object>();
-                    ret.addAll(e.getKey());
-                    ret.add(((OpaqueValue)e.getValue()).getCurr());
-                    return ret;
-                }
-
-                public void remove() {
-                    throw new UnsupportedOperationException("Not supported yet.");
-                }
-            };
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java b/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java
deleted file mode 100644
index e222ba6..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/MemoryBackingMap.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package storm.trident.testing;
-
-import storm.trident.state.map.IBackingMap;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MemoryBackingMap implements IBackingMap<Object> {
-    Map _vals = new HashMap();
-
-    @Override
-    public List<Object> multiGet(List<List<Object>> keys) {
-        List ret = new ArrayList();
-        for(List key: keys) {
-            ret.add(_vals.get(key));
-        }
-        return ret;
-    }
-
-    @Override
-    public void multiPut(List<List<Object>> keys, List<Object> vals) {
-        for(int i=0; i<keys.size(); i++) {
-            List key = keys.get(i);
-            Object val = vals.get(i);
-            _vals.put(key, val);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java b/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java
deleted file mode 100644
index 9512d63..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/MemoryMapState.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.task.IMetricsContext;
-import storm.trident.state.ITupleCollection;
-import backtype.storm.tuple.Values;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.ValueUpdater;
-import storm.trident.state.map.*;
-import storm.trident.state.snapshot.Snapshottable;
-
-public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, MapState<T>, RemovableMapState<T> {
-
-    MemoryMapStateBacking<OpaqueValue> _backing;
-    SnapshottableMap<T> _delegate;
-    List<List<Object>> _removed = new ArrayList();
-    Long _currTx = null;
-
-
-    public MemoryMapState(String id) {
-        _backing = new MemoryMapStateBacking(id);
-        _delegate = new SnapshottableMap(OpaqueMap.build(_backing), new Values("$MEMORY-MAP-STATE-GLOBAL$"));
-    }
-
-    public T update(ValueUpdater updater) {
-        return _delegate.update(updater);
-    }
-
-    public void set(T o) {
-        _delegate.set(o);
-    }
-
-    public T get() {
-        return _delegate.get();
-    }
-
-    public void beginCommit(Long txid) {
-        _delegate.beginCommit(txid);
-        if(txid==null || !txid.equals(_currTx)) {
-            _backing.multiRemove(_removed);
-        }
-        _removed = new ArrayList();
-        _currTx = txid;
-    }
-
-    public void commit(Long txid) {
-        _delegate.commit(txid);
-    }
-
-    public Iterator<List<Object>> getTuples() {
-        return _backing.getTuples();
-    }
-
-    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
-        return _delegate.multiUpdate(keys, updaters);
-    }
-
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        _delegate.multiPut(keys, vals);
-    }
-
-    public List<T> multiGet(List<List<Object>> keys) {
-        return _delegate.multiGet(keys);
-    }
-
-    @Override
-    public void multiRemove(List<List<Object>> keys) {
-        List nulls = new ArrayList();
-        for(int i=0; i<keys.size(); i++) {
-            nulls.add(null);
-        }
-        // first just set the keys to null, then flag to remove them at beginning of next commit when we know the current and last value are both null
-        multiPut(keys, nulls);
-        _removed.addAll(keys);
-    }
-
-    public static class Factory implements StateFactory {
-
-        String _id;
-
-        public Factory() {
-            _id = UUID.randomUUID().toString();
-        }
-
-        @Override
-        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-            return new MemoryMapState(_id + partitionIndex);
-        }
-    }
-
-    static ConcurrentHashMap<String, Map<List<Object>, Object>> _dbs = new ConcurrentHashMap<String, Map<List<Object>, Object>>();
-    static class MemoryMapStateBacking<T> implements IBackingMap<T>, ITupleCollection {
-
-        public static void clearAll() {
-            _dbs.clear();
-        }
-        Map<List<Object>, T> db;
-        Long currTx;
-
-        public MemoryMapStateBacking(String id) {
-            _dbs.putIfAbsent(id, new HashMap());
-            this.db = (Map<List<Object>, T>) _dbs.get(id);
-        }
-
-        public void multiRemove(List<List<Object>> keys) {
-            for(List<Object> key: keys) {
-                db.remove(key);
-            }
-        }
-
-        @Override
-        public List<T> multiGet(List<List<Object>> keys) {
-            List<T> ret = new ArrayList();
-            for (List<Object> key : keys) {
-                ret.add(db.get(key));
-            }
-            return ret;
-        }
-
-        @Override
-        public void multiPut(List<List<Object>> keys, List<T> vals) {
-            for (int i = 0; i < keys.size(); i++) {
-                List<Object> key = keys.get(i);
-                T val = vals.get(i);
-                db.put(key, val);
-            }
-        }
-
-        @Override
-        public Iterator<List<Object>> getTuples() {
-            return new Iterator<List<Object>>() {
-
-                private Iterator<Map.Entry<List<Object>, T>> it = db.entrySet().iterator();
-
-                public boolean hasNext() {
-                    return it.hasNext();
-                }
-
-                public List<Object> next() {
-                    Map.Entry<List<Object>, T> e = it.next();
-                    List<Object> ret = new ArrayList<Object>();
-                    ret.addAll(e.getKey());
-                    ret.add(((OpaqueValue)e.getValue()).getCurr());
-                    return ret;
-                }
-
-                public void remove() {
-                    throw new UnsupportedOperationException("Not supported yet.");
-                }
-            };
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/Split.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/Split.java b/jstorm-client/src/main/java/storm/trident/testing/Split.java
deleted file mode 100644
index 65cdb8b..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/Split.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-public class Split extends BaseFunction {
-
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        for(String word: tuple.getString(0).split(" ")) {
-            if(word.length() > 0) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/StringLength.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/StringLength.java b/jstorm-client/src/main/java/storm/trident/testing/StringLength.java
deleted file mode 100644
index f99a5c7..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/StringLength.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.testing;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-public class StringLength extends BaseFunction {
-
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        collector.emit(new Values(tuple.getString(0).length()));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java b/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java
deleted file mode 100644
index 6912063..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/TrueFilter.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package storm.trident.testing;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-public class TrueFilter extends BaseFilter {
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        return true;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java b/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java
deleted file mode 100644
index 764e51e..0000000
--- a/jstorm-client/src/main/java/storm/trident/testing/TuplifyArgs.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package storm.trident.testing;
-
-import java.util.List;
-
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-import backtype.storm.utils.Utils;
-
-public class TuplifyArgs extends BaseFunction {
-
-    @Override
-    public void execute(TridentTuple input, TridentCollector collector) {
-        String args = input.getString(0);
-        List<List<Object>> tuples = (List) Utils.from_json(args);
-        for(List<Object> tuple: tuples) {
-            collector.emit(tuple);
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java b/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java
deleted file mode 100644
index a3e3076..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/BatchInfo.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package storm.trident.topology;
-
-import storm.trident.spout.IBatchID;
-
-
-public class BatchInfo {
-    public IBatchID batchId;
-    public Object state;
-    public String batchGroup;
-    
-    public BatchInfo(String batchGroup, IBatchID batchId, Object state) {
-        this.batchGroup = batchGroup;
-        this.batchId = batchId;
-        this.state = state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java b/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java
deleted file mode 100644
index b6f60ce..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/ITridentBatchBolt.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-
-public interface ITridentBatchBolt extends IComponent {
-    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector);
-    void execute(BatchInfo batchInfo, Tuple tuple);
-    void finishBatch(BatchInfo batchInfo);
-    Object initBatchState(String batchGroup, Object batchId);
-    void cleanup();    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java b/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java
deleted file mode 100644
index 201696e..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/MasterBatchCoordinator.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.WindowedTimeThrottler;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.spout.ICommitterTridentSpout;
-import storm.trident.topology.state.TransactionalState;
-
-public class MasterBatchCoordinator extends BaseRichSpout { 
-    public static final Logger LOG = Logger.getLogger(MasterBatchCoordinator.class);
-    
-    public static final long INIT_TXID = 1L;
-    
-    
-    public static final String BATCH_STREAM_ID = "$batch";
-    public static final String COMMIT_STREAM_ID = "$commit";
-    public static final String SUCCESS_STREAM_ID = "$success";
-
-    private static final String CURRENT_TX = "currtx";
-    private static final String CURRENT_ATTEMPTS = "currattempts";
-    
-    private static enum Operation {
-        ACK,
-        FAIL,
-        NEXTTUPLE
-    }
-
-    private List<TransactionalState> _states = new ArrayList();
-    
-    TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
-    TreeMap<Long, Integer> _attemptIds;
-    
-    private SpoutOutputCollector _collector;
-    Long _currTransaction;
-    int _maxTransactionActive;
-    
-    List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
-    
-    
-    List<String> _managedSpoutIds;
-    List<ITridentSpout> _spouts;
-    WindowedTimeThrottler _throttler;
-    
-    boolean _active = true;
-    
-    AtomicBoolean failedOccur = new AtomicBoolean(false);
-    
-    public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
-        if(spoutIds.isEmpty()) {
-            throw new IllegalArgumentException("Must manage at least one spout");
-        }
-        _managedSpoutIds = spoutIds;
-        _spouts = spouts;
-    }
-
-    public List<String> getManagedSpoutIds(){
-        return _managedSpoutIds;
-    }
-
-    @Override
-    public void activate() {
-        _active = true;
-    }
-
-    @Override
-    public void deactivate() {
-        _active = false;
-    }
-        
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
-        for(String spoutId: _managedSpoutIds) {
-            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
-        }
-        _currTransaction = getStoredCurrTransaction();
-
-        _collector = collector;
-        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-        if(active==null) {
-            _maxTransactionActive = 1;
-        } else {
-            _maxTransactionActive = active.intValue();
-        }
-        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
-
-        
-        for(int i=0; i<_spouts.size(); i++) {
-            String txId = _managedSpoutIds.get(i);
-            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
-        }
-    }
-
-    @Override
-    public void close() {
-        for(TransactionalState state: _states) {
-            state.close();
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        sync(Operation.NEXTTUPLE, null);
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        sync(Operation.ACK, (TransactionAttempt) msgId);
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        sync(Operation.FAIL, (TransactionAttempt) msgId);
-    }
-    
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
-        // when it sees the earlier txid it should know to emit nothing
-        declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
-        declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
-        declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
-    }
-    
-    synchronized private void sync(Operation op, TransactionAttempt attempt) {
-        TransactionStatus status;
-        long txid;
-        
-        switch (op) {
-            case FAIL:
-                // Remove the failed one and the items whose id is higher than the failed one.
-                // Then those ones will be retried when nextTuple.
-                txid = attempt.getTransactionId();
-                status = _activeTx.remove(txid);
-                if(status!=null && status.attempt.equals(attempt)) {
-                    _activeTx.tailMap(txid).clear();
-                }
-                break;
-                
-            case ACK:
-                txid = attempt.getTransactionId();
-                status = _activeTx.get(txid);
-                if(status!=null && attempt.equals(status.attempt)) {
-                    if(status.status==AttemptStatus.PROCESSING ) {
-                        status.status = AttemptStatus.PROCESSED;
-                    } else if(status.status==AttemptStatus.COMMITTING) {
-                        status.status = AttemptStatus.COMMITTED;
-                    }
-                }
-                break;
-        
-            case NEXTTUPLE:
-                // note that sometimes the tuples active may be less than max_spout_pending, e.g.
-                // max_spout_pending = 3
-                // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
-                // and there won't be a batch for tx 4 because there's max_spout_pending tx active
-                status = _activeTx.get(_currTransaction);
-                if (status!=null) {
-                    if(status.status == AttemptStatus.PROCESSED) {
-                        status.status = AttemptStatus.COMMITTING;
-                        _collector.emit(COMMIT_STREAM_ID, new Values(status.attempt), status.attempt);
-                    } else if (status.status == AttemptStatus.COMMITTED) {
-                        _activeTx.remove(status.attempt.getTransactionId());
-                        _attemptIds.remove(status.attempt.getTransactionId());
-                        _collector.emit(SUCCESS_STREAM_ID, new Values(status.attempt));
-                        _currTransaction = nextTransactionId(status.attempt.getTransactionId());
-                        for(TransactionalState state: _states) {
-                            state.setData(CURRENT_TX, _currTransaction);
-                        }
-                    }
-                }
-
-                if(_active) {
-                    if(_activeTx.size() < _maxTransactionActive) {
-                        Long curr = _currTransaction;
-                        for(int i=0; i<_maxTransactionActive; i++) {
-                            if(batchDelay()) {
-                                break;
-                            }
-                            
-                            if(isReady(curr)) {
-                                if(!_activeTx.containsKey(curr)) {
-                                    // by using a monotonically increasing attempt id, downstream tasks
-                                    // can be memory efficient by clearing out state for old attempts
-                                    // as soon as they see a higher attempt id for a transaction
-                                    Integer attemptId = _attemptIds.get(curr);
-                                    if(attemptId==null) {
-                                       attemptId = 0;
-                                    } else {
-                                       attemptId++;
-                                    }
-                                    _attemptIds.put(curr, attemptId);
-                                    for(TransactionalState state: _states) {
-                                        state.setData(CURRENT_ATTEMPTS, _attemptIds);
-                                    }
-                        
-                                    TransactionAttempt currAttempt = new TransactionAttempt(curr, attemptId);
-                                    _activeTx.put(curr, new TransactionStatus(currAttempt));
-                                    _collector.emit(BATCH_STREAM_ID, new Values(currAttempt), currAttempt);                   
-                                    _throttler.markEvent();
-                                    break;
-                                } 
-                            }
-                            curr = nextTransactionId(curr);
-                        }
-                    } else {
-                        // Do nothing
-                    }
-                }
-                break;
-            
-            default:
-                LOG.warn("Unknow Operation code=" + op);
-                break;
-        }
-    }
-    
-    private boolean isReady(long txid) {
-        //TODO: make this strategy configurable?... right now it goes if anyone is ready
-        for(ITridentSpout.BatchCoordinator coord: _coordinators) {
-            if(coord.isReady(txid)) return true;
-        }
-        return false;
-    }
-    
-    private boolean batchDelay() {
-        return _throttler.isThrottled();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config ret = new Config();
-        ret.setMaxTaskParallelism(1);
-        ret.registerSerialization(TransactionAttempt.class);
-        return ret;
-    }
-    
-    private static enum AttemptStatus {
-        PROCESSING,
-        PROCESSED,
-        COMMITTING,
-        COMMITTED
-    }
-    
-    private static class TransactionStatus {
-        TransactionAttempt attempt;
-        AttemptStatus status;
-        
-        public TransactionStatus(TransactionAttempt attempt) {
-            this.attempt = attempt;
-            this.status = AttemptStatus.PROCESSING;
-        }
-
-        @Override
-        public String toString() {
-            return attempt.toString() + " <" + status.toString() + ">";
-        }        
-    }
-    
-    
-    private Long nextTransactionId(Long id) {
-        return id + 1;
-    }  
-    
-    private Long getStoredCurrTransaction() {
-        Long ret = INIT_TXID;
-        for(TransactionalState state: _states) {
-            Long curr = (Long) state.getData(CURRENT_TX);
-            if(curr!=null && curr.compareTo(ret) > 0) {
-                ret = curr;
-            }
-        }
-        return ret;
-    }
-    
-    private TreeMap<Long, Integer> getStoredCurrAttempts(long currTransaction, int maxBatches) {
-        TreeMap<Long, Integer> ret = new TreeMap<Long, Integer>();
-        for(TransactionalState state: _states) {
-            Map<Object, Number> attempts = (Map) state.getData(CURRENT_ATTEMPTS);
-            if(attempts==null) attempts = new HashMap();
-            for(Entry<Object, Number> e: attempts.entrySet()) {
-                // this is because json doesn't allow numbers as keys...
-                // TODO: replace json with a better form of encoding
-                Number txidObj;
-                if(e.getKey() instanceof String) {
-                    txidObj = Long.parseLong((String) e.getKey());
-                } else {
-                    txidObj = (Number) e.getKey();
-                }
-                long txid = ((Number) txidObj).longValue();
-                int attemptId = ((Number) e.getValue()).intValue();
-                Integer curr = ret.get(txid);
-                if(curr==null || attemptId > curr) {
-                    ret.put(txid, attemptId);
-                }                
-            }
-        }
-        ret.headMap(currTransaction).clear();
-        ret.tailMap(currTransaction + maxBatches - 1).clear();
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java b/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java
deleted file mode 100644
index b2ea328..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/TransactionAttempt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package storm.trident.topology;
-
-import storm.trident.spout.IBatchID;
-
-
-public class TransactionAttempt implements IBatchID {
-    Long _txid;
-    int _attemptId;
-    
-    
-    // for kryo compatibility
-    public TransactionAttempt() {
-        
-    }
-    
-    public TransactionAttempt(Long txid, int attemptId) {
-        _txid = txid;
-        _attemptId = attemptId;
-    }
-    
-    public Long getTransactionId() {
-        return _txid;
-    }
-    
-    public Object getId() {
-        return _txid;
-    }
-    
-    public int getAttemptId() {
-        return _attemptId;
-    }
-
-    @Override
-    public int hashCode() {
-        return _txid.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if(!(o instanceof TransactionAttempt)) return false;
-        TransactionAttempt other = (TransactionAttempt) o;
-        return _txid.equals(other._txid) && _attemptId == other._attemptId;
-    }
-
-    @Override
-    public String toString() {
-        return "" + _txid + ":" + _attemptId;
-    }    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java b/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java
deleted file mode 100644
index 71807bb..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/TridentBoltExecutor.java
+++ /dev/null
@@ -1,430 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.coordination.BatchOutputCollectorImpl;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.RotatingMap;
-import backtype.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-
-import storm.trident.spout.IBatchID;
-
-public class TridentBoltExecutor implements IRichBolt {
-    public static String COORD_STREAM_PREFIX = "$coord-";
-    
-    public static String COORD_STREAM(String batch) {
-        return COORD_STREAM_PREFIX + batch;
-    }
-    
-    public static class CoordType implements Serializable {
-        public boolean singleCount;
-        
-        protected CoordType(boolean singleCount) {
-            this.singleCount = singleCount;
-        }
-
-        public static CoordType single() {
-            return new CoordType(true);
-        }
-
-        public static CoordType all() {
-            return new CoordType(false);
-        }
-
-       @Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + (singleCount ? 1231 : 1237);
-			return result;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			CoordType other = (CoordType) obj;
-			if (singleCount != other.singleCount)
-				return false;
-			return true;
-		}
-        
-        @Override
-        public String toString() {
-            return "<Single: " + singleCount + ">";
-        }
-
-		
-    }
-    
-    public static class CoordSpec implements Serializable {
-        public GlobalStreamId commitStream = null;
-        public Map<String, CoordType> coords = new HashMap<String, CoordType>();
-        
-        public CoordSpec() {
-        }
-    }
-    
-    public static class CoordCondition implements Serializable {
-        public GlobalStreamId commitStream;
-        public int expectedTaskReports;
-        Set<Integer> targetTasks;
-
-        @Override
-        public String toString() {
-            return ToStringBuilder.reflectionToString(this);
-        }        
-    }
-    
-    Map<GlobalStreamId, String> _batchGroupIds;
-    Map<String, CoordSpec> _coordSpecs;
-    Map<String, CoordCondition> _coordConditions;
-    ITridentBatchBolt _bolt;
-    long _messageTimeoutMs;
-    long _lastRotate;
-    
-    RotatingMap _batches;
-    
-    // map from batchgroupid to coordspec
-    public TridentBoltExecutor(ITridentBatchBolt bolt, Map<GlobalStreamId, String> batchGroupIds, Map<String, CoordSpec> coordinationSpecs) {
-        _batchGroupIds = batchGroupIds;
-        _coordSpecs = coordinationSpecs;
-        _bolt = bolt;        
-    }
-    
-    public static class TrackedBatch {
-        int attemptId;
-        BatchInfo info;
-        CoordCondition condition;
-        int reportedTasks = 0;
-        int expectedTupleCount = 0;
-        int receivedTuples = 0;
-        Map<Integer, Integer> taskEmittedTuples = new HashMap();
-        boolean failed = false;
-        boolean receivedCommit;
-        Tuple delayedAck = null;
-        
-        public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) {
-            this.info = info;
-            this.condition = condition;
-            this.attemptId = attemptId;
-            receivedCommit = condition.commitStream == null;
-        }
-
-        @Override
-        public String toString() {
-            return ToStringBuilder.reflectionToString(this);
-        }        
-    }
-    
-    public class CoordinatedOutputCollector implements IOutputCollector {
-        IOutputCollector _delegate;
-        
-        TrackedBatch _currBatch = null;;
-        
-        public void setCurrBatch(TrackedBatch batch) {
-            _currBatch = batch;
-        }
-        
-        public CoordinatedOutputCollector(IOutputCollector delegate) {
-            _delegate = delegate;
-        }
-
-        public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
-            List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
-            updateTaskCounts(tasks);
-            return tasks;
-        }
-
-        public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
-            updateTaskCounts(Arrays.asList(task));
-            _delegate.emitDirect(task, stream, anchors, tuple);
-        }
-
-        public void ack(Tuple tuple) {
-            throw new IllegalStateException("Method should never be called");
-        }
-
-        public void fail(Tuple tuple) {
-            throw new IllegalStateException("Method should never be called");
-        }
-        
-        public void reportError(Throwable error) {
-            _delegate.reportError(error);
-        }
-
-
-        private void updateTaskCounts(List<Integer> tasks) {
-            if(_currBatch!=null) {
-                Map<Integer, Integer> taskEmittedTuples = _currBatch.taskEmittedTuples;
-                for(Integer task: tasks) {
-                    int newCount = Utils.get(taskEmittedTuples, task, 0) + 1;
-                    taskEmittedTuples.put(task, newCount);
-                }
-            }
-        }
-    }
-    
-    OutputCollector _collector;
-    CoordinatedOutputCollector _coordCollector;
-    BatchOutputCollector _coordOutputCollector;
-    TopologyContext _context;
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {        
-        _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
-        _lastRotate = System.currentTimeMillis();
-        _batches = new RotatingMap(2);
-        _context = context;
-        _collector = collector;
-        _coordCollector = new CoordinatedOutputCollector(collector);
-        _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));
-                
-        _coordConditions = (Map) context.getExecutorData("__coordConditions");
-        if(_coordConditions==null) {
-            _coordConditions = new HashMap();
-            for(String batchGroup: _coordSpecs.keySet()) {
-                CoordSpec spec = _coordSpecs.get(batchGroup);
-                CoordCondition cond = new CoordCondition();
-                cond.commitStream = spec.commitStream;
-                cond.expectedTaskReports = 0;
-                for(String comp: spec.coords.keySet()) {
-                    CoordType ct = spec.coords.get(comp);
-                    if(ct.equals(CoordType.single())) {
-                        cond.expectedTaskReports+=1;
-                    } else {
-                        cond.expectedTaskReports+=context.getComponentTasks(comp).size();
-                    }
-                }
-                cond.targetTasks = new HashSet<Integer>();
-                for(String component: Utils.get(context.getThisTargets(),
-                                        COORD_STREAM(batchGroup),
-                                        new HashMap<String, Grouping>()).keySet()) {
-                    cond.targetTasks.addAll(context.getComponentTasks(component));
-                }
-                _coordConditions.put(batchGroup, cond);
-            }
-            context.setExecutorData("_coordConditions", _coordConditions);
-        }
-        _bolt.prepare(conf, context, _coordOutputCollector);
-    }
-    
-    private void failBatch(TrackedBatch tracked, FailedException e) {
-        if(e!=null && e instanceof ReportedFailedException) {
-            _collector.reportError(e);
-        }
-        tracked.failed = true;
-        if(tracked.delayedAck!=null) {
-            _collector.fail(tracked.delayedAck);
-            tracked.delayedAck = null;
-        }
-    }
-    
-    private void failBatch(TrackedBatch tracked) {
-        failBatch(tracked, null);
-    }
-
-    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
-        boolean success = true;
-        try {
-            _bolt.finishBatch(tracked.info);
-            String stream = COORD_STREAM(tracked.info.batchGroup);
-            for(Integer task: tracked.condition.targetTasks) {
-                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
-            }
-            if(tracked.delayedAck!=null) {
-                _collector.ack(tracked.delayedAck);
-                tracked.delayedAck = null;
-            }
-        } catch(FailedException e) {
-            failBatch(tracked, e);
-            success = false;
-        }
-        _batches.remove(tracked.info.batchId.getId());
-        return success;
-    }
-    
-    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
-        if(tracked.failed) {
-            failBatch(tracked);
-            _collector.fail(tuple);
-            return;
-        }
-        CoordCondition cond = tracked.condition;
-        boolean delayed = tracked.delayedAck==null &&
-                              (cond.commitStream!=null && type==TupleType.COMMIT
-                               || cond.commitStream==null);
-        if(delayed) {
-            tracked.delayedAck = tuple;
-        }
-        boolean failed = false;
-        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
-            if(tracked.receivedTuples == tracked.expectedTupleCount) {
-                finishBatch(tracked, tuple);                
-            } else {
-                //TODO: add logging that not all tuples were received
-                failBatch(tracked);
-                _collector.fail(tuple);
-                failed = true;
-            }
-        }
-        
-        if(!delayed && !failed) {
-            _collector.ack(tuple);
-        }
-        
-    }
-    
-    @Override
-    public void execute(Tuple tuple) {
-        if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
-            long now = System.currentTimeMillis();
-            if(now - _lastRotate > _messageTimeoutMs) {
-                _batches.rotate();
-                _lastRotate = now;
-            }
-            return;
-        }
-        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamid());
-        if(batchGroup==null) {
-            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
-            _coordCollector.setCurrBatch(null);
-            _bolt.execute(null, tuple);
-            _collector.ack(tuple);
-            return;
-        }
-        IBatchID id = (IBatchID) tuple.getValue(0);
-        //get transaction id
-        //if it already exissts and attempt id is greater than the attempt there
-        
-        
-        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
-//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
-//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
-//                    + " (" + _batches.size() + ")" +
-//                    "\ntuple: " + tuple +
-//                    "\nwith tracked " + tracked +
-//                    "\nwith id " + id + 
-//                    "\nwith group " + batchGroup
-//                    + "\n");
-//            
-//        }
-        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
-        
-        // this code here ensures that only one attempt is ever tracked for a batch, so when
-        // failures happen you don't get an explosion in memory usage in the tasks
-        if(tracked!=null) {
-            if(id.getAttemptId() > tracked.attemptId) {
-                _batches.remove(id.getId());
-                tracked = null;
-            } else if(id.getAttemptId() < tracked.attemptId) {
-                // no reason to try to execute a previous attempt than we've already seen
-                return;
-            }
-        }
-        
-        if(tracked==null) {
-            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
-            _batches.put(id.getId(), tracked);
-        }
-        _coordCollector.setCurrBatch(tracked);
-        
-        //System.out.println("TRACKED: " + tracked + " " + tuple);
-        
-        TupleType t = getTupleType(tuple, tracked);
-        if(t==TupleType.COMMIT) {
-            tracked.receivedCommit = true;
-            checkFinish(tracked, tuple, t);
-        } else if(t==TupleType.COORD) {
-            int count = tuple.getInteger(1);
-            tracked.reportedTasks++;
-            tracked.expectedTupleCount+=count;
-            checkFinish(tracked, tuple, t);
-        } else {
-            tracked.receivedTuples++;
-            boolean success = true;
-            try {
-                _bolt.execute(tracked.info, tuple);
-                if(tracked.condition.expectedTaskReports==0) {
-                    success = finishBatch(tracked, tuple);
-                }
-            } catch(FailedException e) {
-                failBatch(tracked, e);
-            }
-            if(success) {
-                _collector.ack(tuple);                   
-            } else {
-                _collector.fail(tuple);
-            }
-        }
-        _coordCollector.setCurrBatch(null);
-    }
-
-    @Override
-    public void cleanup() {
-        _bolt.cleanup();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        _bolt.declareOutputFields(declarer);
-        for(String batchGroup: _coordSpecs.keySet()) {
-            declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count"));
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Map<String, Object> ret = _bolt.getComponentConfiguration();
-        if(ret==null) ret = new HashMap();
-        ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
-        // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
-        return ret;
-    }
-    
-    private TupleType getTupleType(Tuple tuple, TrackedBatch batch) {
-        CoordCondition cond = batch.condition;
-        if(cond.commitStream!=null
-                && tuple.getSourceGlobalStreamid().equals(cond.commitStream)) {
-            return TupleType.COMMIT;
-        } else if(cond.expectedTaskReports > 0
-                && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) {
-            return TupleType.COORD;
-        } else {
-            return TupleType.REGULAR;
-        }
-    }
-    
-    static enum TupleType {
-        REGULAR,
-        COMMIT,
-        COORD
-    }    
-}