You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:56 UTC

[17/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java b/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java
deleted file mode 100644
index 1e75e00..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java
+++ /dev/null
@@ -1,751 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import storm.trident.spout.BatchSpoutExecutor;
-import storm.trident.spout.IBatchSpout;
-import storm.trident.spout.ICommitterTridentSpout;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.spout.RichSpoutBatchTriggerer;
-import storm.trident.spout.TridentSpoutCoordinator;
-import storm.trident.spout.TridentSpoutExecutor;
-import storm.trident.topology.TridentBoltExecutor.CoordSpec;
-import storm.trident.topology.TridentBoltExecutor.CoordType;
-
-// based on transactional topologies
-public class TridentTopologyBuilder {
-    Map<GlobalStreamId, String> _batchIds = new HashMap();
-    Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
-    Map<String, SpoutComponent> _batchPerTupleSpouts = new HashMap();
-    Map<String, Component> _bolts = new HashMap();
-        
-    
-    public SpoutDeclarer setBatchPerTupleSpout(String id, String streamName, IRichSpout spout, Integer parallelism, String batchGroup) {
-        Map<String, String> batchGroups = new HashMap();
-        batchGroups.put(streamName, batchGroup);
-        markBatchGroups(id, batchGroups);
-        SpoutComponent c = new SpoutComponent(spout, streamName, parallelism, batchGroup);
-        _batchPerTupleSpouts.put(id, c);
-        return new SpoutDeclarerImpl(c);
-    }
-    
-    public SpoutDeclarer setSpout(String id, String streamName, String txStateId, IBatchSpout spout, Integer parallelism, String batchGroup) {
-        return setSpout(id, streamName, txStateId, new BatchSpoutExecutor(spout), parallelism, batchGroup);
-    }
-    
-    public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
-        Map<String, String> batchGroups = new HashMap();
-        batchGroups.put(streamName, batchGroup);
-        markBatchGroups(id, batchGroups);
-
-        TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
-        _spouts.put(id, c);
-        return new SpoutDeclarerImpl(c);
-    }
-    
-    // map from stream name to batch id
-    public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
-        markBatchGroups(id, batchGroups);
-        Component c = new Component(bolt, parallelism, committerBatches);
-        _bolts.put(id, c);
-        return new BoltDeclarerImpl(c);
-        
-    }
-    
-    String masterCoordinator(String batchGroup) {
-        return "$mastercoord-" + batchGroup;
-    }
-    
-    static final String SPOUT_COORD_PREFIX = "$spoutcoord-";
-    
-    public static String spoutCoordinator(String spoutId) {
-        return SPOUT_COORD_PREFIX + spoutId;
-    }
-    
-    public static String spoutIdFromCoordinatorId(String coordId) {
-        return coordId.substring(SPOUT_COORD_PREFIX.length());
-    }
-    
-    Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
-        Map<GlobalStreamId, String> ret = new HashMap<GlobalStreamId, String>(_batchIds);
-        Set<String> allBatches = new HashSet(_batchIds.values());
-        for(String b: allBatches) {
-            ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b);
-            if(includeCommitStream) {
-                ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
-            }
-            // DO NOT include the success stream as part of the batch. it should not trigger coordination tuples,
-            // and is just a metadata tuple to assist in cleanup, should not trigger batch tracking
-        }
-        
-        for(String id: _spouts.keySet()) {
-            TransactionalSpoutComponent c = _spouts.get(id);
-            if(c.batchGroupId!=null) {
-                ret.put(new GlobalStreamId(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID), c.batchGroupId);
-            }
-        }
-
-        //this takes care of setting up coord streams for spouts and bolts
-        for(GlobalStreamId s: _batchIds.keySet()) {
-            String b = _batchIds.get(s);
-            ret.put(new GlobalStreamId(s.get_componentId(), TridentBoltExecutor.COORD_STREAM(b)), b);
-        }
-        
-        return ret;
-    }
-    
-    public StormTopology buildTopology() {        
-        TopologyBuilder builder = new TopologyBuilder();
-        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
-        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
-
-        Map<String, List<String>> batchesToCommitIds = new HashMap<String, List<String>>();
-        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<String, List<ITridentSpout>>();
-        
-        for(String id: _spouts.keySet()) {
-            TransactionalSpoutComponent c = _spouts.get(id);
-            if(c.spout instanceof IRichSpout) {
-                
-                //TODO: wrap this to set the stream name
-                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
-            } else {
-                String batchGroup = c.batchGroupId;
-                if(!batchesToCommitIds.containsKey(batchGroup)) {
-                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
-                }
-                batchesToCommitIds.get(batchGroup).add(c.commitStateId);
-
-                if(!batchesToSpouts.containsKey(batchGroup)) {
-                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
-                }
-                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
-                
-                
-                BoltDeclarer scd =
-                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
-                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
-                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID)
-                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.COMMIT_STREAM_ID);
-                
-                for(Map m: c.componentConfs) {
-                    scd.addConfigurations(m);
-                }
-                
-                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
-                specs.put(c.batchGroupId, new CoordSpec());
-                BoltDeclarer bd = builder.setBolt(id,
-                        new TridentBoltExecutor(
-                          new TridentSpoutExecutor(
-                            c.commitStateId,
-                            c.streamName,
-                            ((ITridentSpout) c.spout)),
-                            batchIdsForSpouts,
-                            specs),
-                        c.parallelism);
-                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
-                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
-                if(c.spout instanceof ICommitterTridentSpout) {
-                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
-                }
-                for(Map m: c.componentConfs) {
-                    bd.addConfigurations(m);
-                }
-            }
-        }
-        
-        for(String id: _batchPerTupleSpouts.keySet()) {
-            SpoutComponent c = _batchPerTupleSpouts.get(id);
-            SpoutDeclarer d = builder.setSpout(id, new RichSpoutBatchTriggerer((IRichSpout) c.spout, c.streamName, c.batchGroupId), c.parallelism);
-            
-            for(Map conf: c.componentConfs) {
-                d.addConfigurations(conf);
-            }
-        }
-                
-        for(String id: _bolts.keySet()) {
-            Component c = _bolts.get(id);
-            
-            Map<String, CoordSpec> specs = new HashMap();
-            
-            for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
-                String batch = batchIdsForBolts.get(s);
-                if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
-                CoordSpec spec = specs.get(batch);
-                CoordType ct;
-                if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
-                    ct = CoordType.single();
-                } else {
-                    ct = CoordType.all();
-                }
-                spec.coords.put(s.get_componentId(), ct);
-            }
-            
-            for(String b: c.committerBatches) {
-                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
-            }
-            
-            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
-            for(Map conf: c.componentConfs) {
-                d.addConfigurations(conf);
-            }
-            
-            for(InputDeclaration inputDecl: c.declarations) {
-               inputDecl.declare(d);
-            }
-            
-            Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
-            for(String b: batchToComponents.keySet()) {
-                for(String comp: batchToComponents.get(b)) {
-                    d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(b));
-                }
-            }
-            
-            for(String b: c.committerBatches) {
-                d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
-            }
-        }
-        
-        for(String batch: batchesToCommitIds.keySet()) {
-            List<String> commitIds = batchesToCommitIds.get(batch);
-            boolean batchCommit = false;
-            builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
-        }
-
-        return builder.createTopology();
-    }
-    
-    private void markBatchGroups(String component, Map<String, String> batchGroups) {
-        for(String stream: batchGroups.keySet()) {
-            _batchIds.put(new GlobalStreamId(component, stream), batchGroups.get(stream));
-        }
-    }
-    
-    
-    private static class SpoutComponent {
-        public Object spout;
-        public Integer parallelism;
-        public List<Map> componentConfs = new ArrayList<Map>();
-        String batchGroupId;
-        String streamName;
-        
-        public SpoutComponent(Object spout, String streamName, Integer parallelism, String batchGroupId) {
-            this.spout = spout;
-            this.streamName = streamName;
-            this.parallelism = parallelism;
-            this.batchGroupId = batchGroupId;
-        }
-
-        @Override
-        public String toString() {
-            return ToStringBuilder.reflectionToString(this);
-        }
-    }
-    
-    private static class TransactionalSpoutComponent extends SpoutComponent {
-        public String commitStateId; 
-        
-        public TransactionalSpoutComponent(Object spout, String streamName, Integer parallelism, String commitStateId, String batchGroupId) {
-            super(spout, streamName, parallelism, batchGroupId);
-            this.commitStateId = commitStateId;
-        }
-
-        @Override
-        public String toString() {
-            return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
-        }        
-    }    
-    
-    private static class Component {
-        public ITridentBatchBolt bolt;
-        public Integer parallelism;
-        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-        public List<Map> componentConfs = new ArrayList<Map>();
-        public Set<String> committerBatches;
-        
-        public Component(ITridentBatchBolt bolt, Integer parallelism,Set<String> committerBatches) {
-            this.bolt = bolt;
-            this.parallelism = parallelism;
-            this.committerBatches = committerBatches;
-        }
-
-        @Override
-        public String toString() {
-            return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
-        }        
-    }
-    
-    Map<String, Set<String>> getBoltBatchToComponentSubscriptions(String id) {
-        Map<String, Set<String>> ret = new HashMap();
-        for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
-            String b = _batchIds.get(s);
-            if(!ret.containsKey(b)) ret.put(b, new HashSet());
-            ret.get(b).add(s.get_componentId());
-        }
-        return ret;
-    }
-    
-    List<GlobalStreamId> getBoltSubscriptionStreams(String id) {
-        List<GlobalStreamId> ret = new ArrayList();
-        Component c = _bolts.get(id);
-        for(InputDeclaration d: c.declarations) {
-            ret.add(new GlobalStreamId(d.getComponent(), d.getStream()));
-        }
-        return ret;
-    }
-    
-    private static interface InputDeclaration {
-        void declare(InputDeclarer declarer);
-        String getComponent();
-        String getStream();
-    }
-    
-    private class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
-        SpoutComponent _component;
-        
-        public SpoutDeclarerImpl(SpoutComponent component) {
-            _component = component;
-        }
-        
-        @Override
-        public SpoutDeclarer addConfigurations(Map conf) {
-            _component.componentConfs.add(conf);
-            return this;
-        }        
-    }
-    
-    private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
-        Component _component;
-        
-        public BoltDeclarerImpl(Component component) {
-            _component = component;
-        }
-        
-        @Override
-        public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.fieldsGrouping(component, fields);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.fieldsGrouping(component, streamId, fields);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer globalGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.globalGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-                
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer globalGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.globalGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer shuffleGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.shuffleGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.shuffleGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localOrShuffleGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localOrShuffleGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localOrShuffleGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-        
-        @Override
-        public BoltDeclarer localFirstGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localFirstGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localFirstGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-        
-        @Override
-        public BoltDeclarer noneGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.noneGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer noneGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.noneGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer allGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.allGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer allGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.allGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer directGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.directGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer directGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.directGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-        
-        @Override
-        public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.customGrouping(component, grouping);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return null;
-                }
-            });
-            return this;        
-        }
-
-        @Override
-        public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.customGrouping(component, streamId, grouping);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-
-                @Override
-                public String getStream() {
-                    return streamId;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.grouping(stream, grouping);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return stream.get_componentId();
-                }                
-
-                @Override
-                public String getStream() {
-                    return stream.get_streamId();
-                }
-            });
-            return this;
-        }
-        
-        private void addDeclaration(InputDeclaration declaration) {
-            _component.declarations.add(declaration);
-        }
-
-        @Override
-        public BoltDeclarer addConfigurations(Map conf) {
-            _component.componentConfs.add(conf);
-            return this;
-        }
-
-		
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java b/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java
deleted file mode 100644
index 9f22cc7..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package storm.trident.topology.state;
-
-import backtype.storm.utils.Utils;
-import org.apache.zookeeper.KeeperException;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-public class RotatingTransactionalState {
-    public static interface StateInitializer {
-        Object init(long txid, Object lastState);
-    }    
-
-    private TransactionalState _state;
-    private String _subdir;
-    
-    private TreeMap<Long, Object> _curr = new TreeMap<Long, Object>();
-    
-    public RotatingTransactionalState(TransactionalState state, String subdir) {
-        _state = state;
-        _subdir = subdir;
-        state.mkdir(subdir);
-        sync();
-    }
-
-
-    public Object getLastState() {
-        if(_curr.isEmpty()) return null;
-        else return _curr.lastEntry().getValue();
-    }
-    
-    public void overrideState(long txid, Object state) {
-        _state.setData(txPath(txid), state);
-        _curr.put(txid, state);
-    }
-
-    public void removeState(long txid) {
-        if(_curr.containsKey(txid)) {
-            _curr.remove(txid);
-            _state.delete(txPath(txid));
-        }
-    }
-    
-    public Object getState(long txid) {
-        return _curr.get(txid);
-    }
-    
-    public Object getState(long txid, StateInitializer init) {
-        if(!_curr.containsKey(txid)) {
-            SortedMap<Long, Object> prevMap = _curr.headMap(txid);
-            SortedMap<Long, Object> afterMap = _curr.tailMap(txid);            
-            
-            Long prev = null;
-            if(!prevMap.isEmpty()) prev = prevMap.lastKey();            
-            
-            Object data;
-            if(afterMap.isEmpty()) {
-                Object prevData;
-                if(prev!=null) {
-                    prevData = _curr.get(prev);
-                } else {
-                    prevData = null;
-                }
-                data = init.init(txid, prevData);
-            } else {
-                data = null;
-            }
-            _curr.put(txid, data);
-            _state.setData(txPath(txid), data);
-        }
-        return _curr.get(txid);
-    }
-    
-    public Object getPreviousState(long txid) {
-        SortedMap<Long, Object> prevMap = _curr.headMap(txid);
-        if(prevMap.isEmpty()) return null;
-        else return prevMap.get(prevMap.lastKey());
-    }
-    
-    public boolean hasCache(long txid) {
-        return _curr.containsKey(txid);
-    }
-       
-    /**
-     * Returns null if it was created, the value otherwise.
-     */
-    public Object getStateOrCreate(long txid, StateInitializer init) {
-        if(_curr.containsKey(txid)) {
-            return _curr.get(txid);
-        } else {
-            getState(txid, init);
-            return null;
-        }
-    }
-    
-    public void cleanupBefore(long txid) {
-        SortedMap<Long, Object> toDelete = _curr.headMap(txid);
-        for(long tx: new HashSet<Long>(toDelete.keySet())) {
-            _curr.remove(tx);
-            try {
-                _state.delete(txPath(tx));
-            } catch(RuntimeException e) {
-                // Ignore NoNodeExists exceptions because when sync() it may populate _curr with stale data since
-                // zookeeper reads are eventually consistent.
-                if(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
-                    throw e;
-                }
-            }
-        }
-    }
-    
-    private void sync() {
-        List<String> txids = _state.list(_subdir);
-        for(String txid_s: txids) {
-            Object data = _state.getData(txPath(txid_s));
-            _curr.put(Long.parseLong(txid_s), data);
-        }
-    }
-    
-    private String txPath(long tx) {
-        return txPath("" + tx);
-    }
-
-    private String txPath(String tx) {
-        return _subdir + "/" + tx;
-    }    
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java b/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java
deleted file mode 100644
index 44d4282..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package storm.trident.topology.state;
-
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-public class TransactionalState {
-    CuratorFramework _curator;
-    
-    public static TransactionalState newUserState(Map conf, String id) {
-        return new TransactionalState(conf, id, "user");
-    }
-    
-    public static TransactionalState newCoordinatorState(Map conf, String id) {
-        return new TransactionalState(conf, id, "coordinator");        
-    }
-    
-    protected TransactionalState(Map conf, String id, String subroot) {
-        try {
-            conf = new HashMap(conf);
-            String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
-            List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
-            Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
-            CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port);
-            try {
-                initter.create().creatingParentsIfNeeded().forPath(rootDir);
-            } catch(KeeperException.NodeExistsException e)  {
-                
-            }
-            
-            initter.close();
-                                    
-            _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
-        } catch (Exception e) {
-           throw new RuntimeException(e);
-        }
-    }
-    
-    public void setData(String path, Object obj) {
-        path = "/" + path;
-        byte[] ser;
-        try {
-            ser = Utils.to_json(obj).getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
-        try {
-            if(_curator.checkExists().forPath(path)!=null) {
-                _curator.setData().forPath(path, ser);
-            } else {
-                _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, ser);
-            }
-        } catch(Exception e) {
-            throw new RuntimeException(e);
-        }        
-    }
-    
-    public void delete(String path) {
-        path = "/" + path;
-        try {
-            _curator.delete().forPath(path);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    public List<String> list(String path) {
-        path = "/" + path;
-        try {
-            if(_curator.checkExists().forPath(path)==null) {
-                return new ArrayList<String>();
-            } else {
-                return _curator.getChildren().forPath(path);
-            }
-        } catch(Exception e) {
-            throw new RuntimeException(e);
-        }   
-    }
-    
-    public void mkdir(String path) {
-        setData(path, 7);
-    }
-    
-    public Object getData(String path) {
-        path = "/" + path;
-        try {
-            if(_curator.checkExists().forPath(path)!=null) {
-                return Utils.from_json(new String(_curator.getData().forPath(path), "UTF-8"));
-            } else {
-                return null;
-            }
-        } catch(Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    public void close() {
-        _curator.close();
-    }
-    
-    private Object getWithBackup(Map amap, Object primary, Object backup) {
-        Object ret = amap.get(primary);
-        if(ret==null) return amap.get(backup);
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java b/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java
deleted file mode 100644
index 0221579..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package storm.trident.tuple;
-
-import java.io.Serializable;
-import java.util.AbstractList;
-import java.util.List;
-import org.apache.commons.lang.builder.ToStringBuilder;
-
-
-public class ComboList extends AbstractList<Object> {    
-    public static class Factory implements Serializable {
-        Pointer[] index;
-        int[] sizes;
-        
-        public Factory(int... sizes) {
-            this.sizes = sizes;
-            int total = 0;
-            for(int size: sizes) {
-                total+=size;
-            }
-            index = new Pointer[total];
-            int i=0;
-            int j=0;
-            for(int size: sizes) {
-                for(int z=0; z<size; z++) {
-                    index[j] = new Pointer(i, z);
-                    j++;
-                }
-                i++;
-            }
-        }
-        
-        public ComboList create(List[] delegates) {
-            if(delegates.length!=sizes.length) {
-                throw new RuntimeException("Expected " + sizes.length + " lists, but instead got " + delegates.length + " lists");
-            }
-            for(int i=0; i<delegates.length; i++) {
-                List l = delegates[i];
-                if(l==null || l.size() != sizes[i]) {
-                    throw new RuntimeException("Got unexpected delegates to ComboList: " + ToStringBuilder.reflectionToString(delegates));
-                }
-            }
-            return new ComboList(delegates, index);
-        }
-    }
-    
-    private static class Pointer implements Serializable {
-        int listIndex;
-        int subIndex;
-        
-        public Pointer(int listIndex, int subIndex) {
-            this.listIndex = listIndex;
-            this.subIndex = subIndex;
-        }
-        
-    }
-    
-    Pointer[] _index;
-    List[] _delegates;
-    
-    public ComboList(List[] delegates, Pointer[] index) {
-        _index = index;
-        _delegates = delegates;
-    }
-            
-    @Override
-    public Object get(int i) {
-        Pointer ptr = _index[i];
-        return _delegates[ptr.listIndex].get(ptr.subIndex);
-    }
-
-    @Override
-    public int size() {
-        return _index.length;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java b/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java
deleted file mode 100644
index 72fd3d3..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package storm.trident.tuple;
-
-import java.util.AbstractList;
-import java.util.List;
-
-public class ConsList extends AbstractList<Object> {
-    List<Object> _elems;
-    Object _first;
-    
-    public ConsList(Object o, List<Object> elems) {
-        _elems = elems;
-        _first = o;
-    }
-
-    @Override
-    public Object get(int i) {
-        if(i==0) return _first;
-        else {
-            return _elems.get(i - 1);
-        }
-    }
-
-    @Override
-    public int size() {
-        return _elems.size() + 1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java b/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java
deleted file mode 100644
index 9159ce7..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package storm.trident.tuple;
-
-import backtype.storm.tuple.ITuple;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public interface TridentTuple extends ITuple, List<Object> {
-
-    public static interface Factory extends Serializable {
-        Map<String, ValuePointer> getFieldIndex();
-        List<String> getOutputFields();
-        int numDelegates();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java b/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java
deleted file mode 100644
index 17f3e3f..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java
+++ /dev/null
@@ -1,342 +0,0 @@
-package storm.trident.tuple;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import clojure.lang.IPersistentVector;
-import clojure.lang.PersistentVector;
-import clojure.lang.RT;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Arrays;
-
-//extends abstractlist so that it can be emitted directly as Storm tuples
-public class TridentTupleView extends AbstractList<Object> implements TridentTuple {
-    ValuePointer[] _index;
-    Map<String, ValuePointer> _fieldIndex;
-    IPersistentVector _delegates;
-
-    public static class ProjectionFactory implements Factory {
-        Map<String, ValuePointer> _fieldIndex;
-        ValuePointer[] _index;
-        Factory _parent;
-
-        public ProjectionFactory(Factory parent, Fields projectFields) {
-            _parent = parent;
-            if(projectFields==null) projectFields = new Fields();
-            Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();
-            _fieldIndex = new HashMap<String, ValuePointer>();
-            for(String f: projectFields) {
-                _fieldIndex.put(f, parentFieldIndex.get(f));
-            }            
-            _index = ValuePointer.buildIndex(projectFields, _fieldIndex);
-        }
-        
-        public TridentTuple create(TridentTuple parent) {
-            if(_index.length==0) return EMPTY_TUPLE;
-            else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);
-        }
-
-        @Override
-        public Map<String, ValuePointer> getFieldIndex() {
-            return _fieldIndex;
-        }
-
-        @Override
-        public int numDelegates() {
-            return _parent.numDelegates();
-        }
-
-        @Override
-        public List<String> getOutputFields() {
-            return indexToFieldsList(_index);
-        }
-    }
-    
-    public static class FreshOutputFactory  implements Factory {
-        Map<String, ValuePointer> _fieldIndex;
-        ValuePointer[] _index;
-
-        public FreshOutputFactory(Fields selfFields) {
-            _fieldIndex = new HashMap<String, ValuePointer>();
-            for(int i=0; i<selfFields.size(); i++) {
-                String field = selfFields.get(i);
-                _fieldIndex.put(field, new ValuePointer(0, i, field));
-            }
-            _index = ValuePointer.buildIndex(selfFields, _fieldIndex);
-        }
-        
-        public TridentTuple create(List<Object> selfVals) {
-            return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
-        }
-
-        @Override
-        public Map<String, ValuePointer> getFieldIndex() {
-            return _fieldIndex;
-        }
-
-        @Override
-        public int numDelegates() {
-            return 1;
-        }
-        
-        @Override
-        public List<String> getOutputFields() {
-            return indexToFieldsList(_index);
-        }        
-    }
-    
-    public static class OperationOutputFactory implements Factory {
-        Map<String, ValuePointer> _fieldIndex;
-        ValuePointer[] _index;
-        Factory _parent;
-
-        public OperationOutputFactory(Factory parent, Fields selfFields) {
-            _parent = parent;
-            _fieldIndex = new HashMap(parent.getFieldIndex());
-            int myIndex = parent.numDelegates();
-            for(int i=0; i<selfFields.size(); i++) {
-                String field = selfFields.get(i);
-                _fieldIndex.put(field, new ValuePointer(myIndex, i, field));
-            }
-            List<String> myOrder = new ArrayList<String>(parent.getOutputFields());
-            
-            Set<String> parentFieldsSet = new HashSet<String>(myOrder);
-            for(String f: selfFields) {
-                if(parentFieldsSet.contains(f)) {
-                    throw new IllegalArgumentException(
-                            "Additive operations cannot add fields with same name as already exists. "
-                            + "Tried adding " + selfFields + " to " + parent.getOutputFields());
-                }
-                myOrder.add(f);
-            }
-            
-            _index = ValuePointer.buildIndex(new Fields(myOrder), _fieldIndex);
-        }
-        
-        public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {
-            IPersistentVector curr = parent._delegates;
-            curr = (IPersistentVector) RT.conj(curr, selfVals);
-            return new TridentTupleView(curr, _index, _fieldIndex);
-        }
-
-        @Override
-        public Map<String, ValuePointer> getFieldIndex() {
-            return _fieldIndex;
-        }
-
-        @Override
-        public int numDelegates() {
-            return _parent.numDelegates() + 1;
-        }
-
-        @Override
-        public List<String> getOutputFields() {
-            return indexToFieldsList(_index);
-        }
-    }
-    
-    public static class RootFactory implements Factory {
-        ValuePointer[] index;
-        Map<String, ValuePointer> fieldIndex;
-        
-        public RootFactory(Fields inputFields) {
-            index = new ValuePointer[inputFields.size()];
-            int i=0;
-            for(String f: inputFields) {
-                index[i] = new ValuePointer(0, i, f);
-                i++;
-            }
-            fieldIndex = ValuePointer.buildFieldIndex(index);
-        }
-        
-        public TridentTuple create(Tuple parent) {            
-            return new TridentTupleView(PersistentVector.EMPTY.cons(parent.getValues()), index, fieldIndex);
-        }
-
-        @Override
-        public Map<String, ValuePointer> getFieldIndex() {
-            return fieldIndex;
-        }
-
-        @Override
-        public int numDelegates() {
-            return 1;
-        }
-        
-        @Override
-        public List<String> getOutputFields() {
-            return indexToFieldsList(this.index);
-        }
-    }
-    
-    private static List<String> indexToFieldsList(ValuePointer[] index) {
-        List<String> ret = new ArrayList<String>();
-        for(ValuePointer p: index) {
-            ret.add(p.field);
-        }
-        return ret;
-    }
-    
-    public static TridentTupleView EMPTY_TUPLE = new TridentTupleView(null, new ValuePointer[0], new HashMap());
-
-    // index and fieldIndex are precomputed, delegates built up over many operations using persistent data structures
-    public TridentTupleView(IPersistentVector delegates, ValuePointer[] index, Map<String, ValuePointer> fieldIndex) {
-        _delegates = delegates;
-        _index = index;
-        _fieldIndex = fieldIndex;
-    }
-
-    public static TridentTuple createFreshTuple(Fields fields, List<Object> values) {
-        FreshOutputFactory factory = new FreshOutputFactory(fields);
-        return factory.create(values);
-    }
-
-    public static TridentTuple createFreshTuple(Fields fields, Object... values) {
-        FreshOutputFactory factory = new FreshOutputFactory(fields);
-        return factory.create(Arrays.asList(values));
-    }
-
-    @Override
-    public List<Object> getValues() {
-        return this;
-    }    
-
-    @Override
-    public int size() {
-        return _index.length;
-    }
-
-    @Override
-    public boolean contains(String field) {
-        return getFields().contains(field);
-    }
-
-    @Override
-    public Fields getFields() {
-        return new Fields(indexToFieldsList(_index));
-    }
-
-    @Override
-    public int fieldIndex(String field) {
-        return getFields().fieldIndex(field);
-    }
-
-    @Override
-    public List<Object> select(Fields selector) {
-        return getFields().select(selector, getValues());
-    }
-
-    @Override
-    public Object get(int i) {
-        return getValue(i);
-    }    
-    
-    @Override
-    public Object getValue(int i) {
-        return getValueByPointer(_index[i]);
-    }
-
-    @Override
-    public String getString(int i) {
-        return (String) getValue(i);
-    }
-
-    @Override
-    public Integer getInteger(int i) {
-        return (Integer) getValue(i);
-    }
-
-    @Override
-    public Long getLong(int i) {
-        return (Long) getValue(i);
-    }
-
-    @Override
-    public Boolean getBoolean(int i) {
-        return (Boolean) getValue(i);
-    }
-
-    @Override
-    public Short getShort(int i) {
-        return (Short) getValue(i);
-    }
-
-    @Override
-    public Byte getByte(int i) {
-        return (Byte) getValue(i);
-    }
-
-    @Override
-    public Double getDouble(int i) {
-        return (Double) getValue(i);
-    }
-
-    @Override
-    public Float getFloat(int i) {
-        return (Float) getValue(i);
-    }
-
-    @Override
-    public byte[] getBinary(int i) {
-        return (byte[]) getValue(i);
-    }
-
-    @Override
-    public Object getValueByField(String field) {
-        return getValueByPointer(_fieldIndex.get(field));
-    }
-
-    @Override
-    public String getStringByField(String field) {
-        return (String) getValueByField(field);
-    }
-
-    @Override
-    public Integer getIntegerByField(String field) {
-        return (Integer) getValueByField(field);
-    }
-
-    @Override
-    public Long getLongByField(String field) {
-        return (Long) getValueByField(field);
-    }
-
-    @Override
-    public Boolean getBooleanByField(String field) {
-        return (Boolean) getValueByField(field);
-    }
-
-    @Override
-    public Short getShortByField(String field) {
-        return (Short) getValueByField(field);
-    }
-
-    @Override
-    public Byte getByteByField(String field) {
-        return (Byte) getValueByField(field);
-    }
-
-    @Override
-    public Double getDoubleByField(String field) {
-        return (Double) getValueByField(field);
-    }
-
-    @Override
-    public Float getFloatByField(String field) {
-        return (Float) getValueByField(field);
-    }
-
-    @Override
-    public byte[] getBinaryByField(String field) {
-        return (byte[]) getValueByField(field);
-    }
-
-    private Object getValueByPointer(ValuePointer ptr) {
-        return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);     
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java b/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java
deleted file mode 100644
index 401261e..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package storm.trident.tuple;
-
-import backtype.storm.tuple.Fields;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.lang.builder.ToStringBuilder;
-
-public class ValuePointer {
-    public static Map<String, ValuePointer> buildFieldIndex(ValuePointer[] pointers) {
-        Map<String, ValuePointer> ret = new HashMap<String, ValuePointer>();
-        for(ValuePointer ptr: pointers) {
-            ret.put(ptr.field, ptr);
-        }
-        return ret;        
-    }
-
-    public static ValuePointer[] buildIndex(Fields fieldsOrder, Map<String, ValuePointer> pointers) {
-        if(fieldsOrder.size()!=pointers.size()) {
-            throw new IllegalArgumentException("Fields order must be same length as pointers map");
-        }
-        ValuePointer[] ret = new ValuePointer[pointers.size()];
-        for(int i=0; i<fieldsOrder.size(); i++) {
-            ret[i] = pointers.get(fieldsOrder.get(i));
-        }
-        return ret;
-    }    
-    
-    public int delegateIndex;
-    protected int index;
-    protected String field;
-    
-    public ValuePointer(int delegateIndex, int index, String field) {
-        this.delegateIndex = delegateIndex;
-        this.index = index;
-        this.field = field;
-    }
-
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this);
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java b/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java
deleted file mode 100644
index 02cff2a..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package storm.trident.util;
-
-import java.io.Serializable;
-import org.jgrapht.EdgeFactory;
-
-public class ErrorEdgeFactory implements EdgeFactory, Serializable {
-    @Override
-    public Object createEdge(Object v, Object v1) {
-        throw new RuntimeException("Edges should be made explicitly");
-    }        
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java b/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java
deleted file mode 100644
index 6201978..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package storm.trident.util;
-
-import java.io.Serializable;
-
-public class IndexedEdge<T> implements Comparable, Serializable {
-	public T source;
-	public T target;
-	public int index;
-
-	public IndexedEdge(T source, T target, int index) {
-		this.source = source;
-		this.target = target;
-		this.index = index;
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + index;
-		result = prime * result + ((source == null) ? 0 : source.hashCode());
-		result = prime * result + ((target == null) ? 0 : target.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		IndexedEdge other = (IndexedEdge) obj;
-		if (index != other.index)
-			return false;
-		if (source == null) {
-			if (other.source != null)
-				return false;
-		} else if (!source.equals(other.source))
-			return false;
-		if (target == null) {
-			if (other.target != null)
-				return false;
-		} else if (!target.equals(other.target))
-			return false;
-		return true;
-	}
-
-	@Override
-	public int compareTo(Object t) {
-		IndexedEdge other = (IndexedEdge) t;
-		return index - other.index;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/LRUMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/LRUMap.java b/jstorm-client/src/main/java/storm/trident/util/LRUMap.java
deleted file mode 100644
index 8d1a9a3..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/LRUMap.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package storm.trident.util;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class LRUMap<A, B> extends LinkedHashMap<A, B> {
-    private int _maxSize;
-
-    public LRUMap(int maxSize) {
-        super(maxSize + 1, 1.0f, true);
-        _maxSize = maxSize;
-    }
-    
-    @Override
-    protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) {
-        return size() > _maxSize;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java b/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java
deleted file mode 100644
index 0059721..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package storm.trident.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.thrift7.TBase;
-import org.apache.thrift7.TDeserializer;
-import org.apache.thrift7.TException;
-import org.apache.thrift7.TSerializer;
-import org.jgrapht.DirectedGraph;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.OutputFieldsGetter;
-import backtype.storm.tuple.Fields;
-
-public class TridentUtils {
-    public static Fields fieldsUnion(Fields... fields) {
-        Set<String> ret = new HashSet<String>();
-        for(Fields f: fields) {
-            if(f!=null) ret.addAll(f.toList());
-        }
-        return new Fields(new ArrayList<String>(ret));
-    }
-    
-    public static Fields fieldsConcat(Fields... fields) {
-        List<String> ret = new ArrayList<String>();
-        for(Fields f: fields) {
-            if(f!=null) ret.addAll(f.toList());
-        }
-        return new Fields(ret);
-    }
-    
-    public static Fields fieldsSubtract(Fields all, Fields minus) {
-        Set<String> removeSet = new HashSet<String>(minus.toList());
-        List<String> toKeep = new ArrayList<String>();
-        for(String s: all.toList()) {
-            if(!removeSet.contains(s)) {
-                toKeep.add(s);
-            }
-        }
-        return new Fields(toKeep);
-    }
-    
-    public static Fields getSingleOutputStreamFields(IComponent component) {
-        OutputFieldsGetter getter = new OutputFieldsGetter();
-        component.declareOutputFields(getter);
-        Map<String, StreamInfo> declaration = getter.getFieldsDeclaration();
-        if(declaration.size()!=1) {
-            throw new RuntimeException("Trident only supports components that emit a single stream");
-        }
-        StreamInfo si = declaration.values().iterator().next();
-        if(si.is_direct()) {
-            throw new RuntimeException("Trident does not support direct streams");
-        }
-        return new Fields(si.get_output_fields());        
-    }
-    
-    /**
-     * Assumes edge contains an index
-     */
-    public static <T> List<T> getParents(DirectedGraph g, T n) {
-        List<IndexedEdge> incoming = new ArrayList(g.incomingEdgesOf(n));
-        Collections.sort(incoming);
-        List<T> ret = new ArrayList();
-        for(IndexedEdge e: incoming) {
-            ret.add((T)e.source);
-        }        
-        return ret;
-    }
-
-    public static <T> List<T> getChildren(DirectedGraph g, T n) {
-        List<IndexedEdge> outgoing = new ArrayList(g.outgoingEdgesOf(n));
-        List<T> ret = new ArrayList();
-        for(IndexedEdge e: outgoing) {
-            ret.add((T)e.target);
-        }        
-        return ret;
-    }
-    
-    
-    public static <T> T getParent(DirectedGraph g, T n) {
-        List<T> parents = getParents(g, n);
-        if(parents.size()!=1) {
-            throw new RuntimeException("Expected a single parent");
-        }
-        return parents.get(0);
-    }
-    
-    private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
-    private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>();
-    
-    public static byte[] thriftSerialize(TBase t) {
-        try {
-            TSerializer ser = threadSer.get();
-            if (ser == null) {
-                ser = new TSerializer();
-                threadSer.set(ser);
-            } 
-            return ser.serialize(t);
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static <T> T thriftDeserialize(Class c, byte[] b) {
-        try {
-            T ret = (T) c.newInstance();
-            TDeserializer des = threadDes.get();
-            if (des == null) {
-                des = new TDeserializer();
-                threadDes.set(des);
-            }
-            des.deserialize((TBase) ret, b);
-            return ret;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/__init__.py
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/__init__.py b/jstorm-client/src/main/py/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/storm/DistributedRPC-remote
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/storm/DistributedRPC-remote b/jstorm-client/src/main/py/storm/DistributedRPC-remote
deleted file mode 100644
index 9b7ebd8..0000000
--- a/jstorm-client/src/main/py/storm/DistributedRPC-remote
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python
-#
-# Autogenerated by Thrift Compiler (0.7.0)
-#
-# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
-#
-
-import sys
-import pprint
-from urlparse import urlparse
-from thrift.transport import TTransport
-from thrift.transport import TSocket
-from thrift.transport import THttpClient
-from thrift.protocol import TBinaryProtocol
-
-import DistributedRPC
-from ttypes import *
-
-if len(sys.argv) <= 1 or sys.argv[1] == '--help':
-  print ''
-  print 'Usage: ' + sys.argv[0] + ' [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
-  print ''
-  print 'Functions:'
-  print '  string execute(string functionName, string funcArgs)'
-  print ''
-  sys.exit(0)
-
-pp = pprint.PrettyPrinter(indent = 2)
-host = 'localhost'
-port = 9090
-uri = ''
-framed = False
-http = False
-argi = 1
-
-if sys.argv[argi] == '-h':
-  parts = sys.argv[argi+1].split(':')
-  host = parts[0]
-  port = int(parts[1])
-  argi += 2
-
-if sys.argv[argi] == '-u':
-  url = urlparse(sys.argv[argi+1])
-  parts = url[1].split(':')
-  host = parts[0]
-  if len(parts) > 1:
-    port = int(parts[1])
-  else:
-    port = 80
-  uri = url[2]
-  if url[4]:
-    uri += '?%s' % url[4]
-  http = True
-  argi += 2
-
-if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
-  framed = True
-  argi += 1
-
-cmd = sys.argv[argi]
-args = sys.argv[argi+1:]
-
-if http:
-  transport = THttpClient.THttpClient(host, port, uri)
-else:
-  socket = TSocket.TSocket(host, port)
-  if framed:
-    transport = TTransport.TFramedTransport(socket)
-  else:
-    transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = DistributedRPC.Client(protocol)
-transport.open()
-
-if cmd == 'execute':
-  if len(args) != 2:
-    print 'execute requires 2 args'
-    sys.exit(1)
-  pp.pprint(client.execute(args[0],args[1],))
-
-else:
-  print 'Unrecognized method %s' % cmd
-  sys.exit(1)
-
-transport.close()

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/storm/DistributedRPC.py b/jstorm-client/src/main/py/storm/DistributedRPC.py
deleted file mode 100644
index a7e6ef9..0000000
--- a/jstorm-client/src/main/py/storm/DistributedRPC.py
+++ /dev/null
@@ -1,256 +0,0 @@
-#
-# Autogenerated by Thrift Compiler (0.7.0)
-#
-# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
-#
-
-from thrift.Thrift import *
-from ttypes import *
-from thrift.Thrift import TProcessor
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol, TProtocol
-try:
-  from thrift.protocol import fastbinary
-except:
-  fastbinary = None
-
-
-class Iface:
-  def execute(self, functionName, funcArgs):
-    """
-    Parameters:
-     - functionName
-     - funcArgs
-    """
-    pass
-
-
-class Client(Iface):
-  def __init__(self, iprot, oprot=None):
-    self._iprot = self._oprot = iprot
-    if oprot is not None:
-      self._oprot = oprot
-    self._seqid = 0
-
-  def execute(self, functionName, funcArgs):
-    """
-    Parameters:
-     - functionName
-     - funcArgs
-    """
-    self.send_execute(functionName, funcArgs)
-    return self.recv_execute()
-
-  def send_execute(self, functionName, funcArgs):
-    self._oprot.writeMessageBegin('execute', TMessageType.CALL, self._seqid)
-    args = execute_args()
-    args.functionName = functionName
-    args.funcArgs = funcArgs
-    args.write(self._oprot)
-    self._oprot.writeMessageEnd()
-    self._oprot.trans.flush()
-
-  def recv_execute(self, ):
-    (fname, mtype, rseqid) = self._iprot.readMessageBegin()
-    if mtype == TMessageType.EXCEPTION:
-      x = TApplicationException()
-      x.read(self._iprot)
-      self._iprot.readMessageEnd()
-      raise x
-    result = execute_result()
-    result.read(self._iprot)
-    self._iprot.readMessageEnd()
-    if result.success is not None:
-      return result.success
-    if result.e is not None:
-      raise result.e
-    raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result");
-
-
-class Processor(Iface, TProcessor):
-  def __init__(self, handler):
-    self._handler = handler
-    self._processMap = {}
-    self._processMap["execute"] = Processor.process_execute
-
-  def process(self, iprot, oprot):
-    (name, type, seqid) = iprot.readMessageBegin()
-    if name not in self._processMap:
-      iprot.skip(TType.STRUCT)
-      iprot.readMessageEnd()
-      x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name))
-      oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
-      x.write(oprot)
-      oprot.writeMessageEnd()
-      oprot.trans.flush()
-      return
-    else:
-      self._processMap[name](self, seqid, iprot, oprot)
-    return True
-
-  def process_execute(self, seqid, iprot, oprot):
-    args = execute_args()
-    args.read(iprot)
-    iprot.readMessageEnd()
-    result = execute_result()
-    try:
-      result.success = self._handler.execute(args.functionName, args.funcArgs)
-    except DRPCExecutionException, e:
-      result.e = e
-    oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
-    result.write(oprot)
-    oprot.writeMessageEnd()
-    oprot.trans.flush()
-
-
-# HELPER FUNCTIONS AND STRUCTURES
-
-class execute_args:
-  """
-  Attributes:
-   - functionName
-   - funcArgs
-  """
-
-  thrift_spec = (
-    None, # 0
-    (1, TType.STRING, 'functionName', None, None, ), # 1
-    (2, TType.STRING, 'funcArgs', None, None, ), # 2
-  )
-
-  def __hash__(self):
-    return 0 + hash(self.functionName) + hash(self.funcArgs)
-
-  def __init__(self, functionName=None, funcArgs=None,):
-    self.functionName = functionName
-    self.funcArgs = funcArgs
-
-  def read(self, iprot):
-    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
-      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
-      return
-    iprot.readStructBegin()
-    while True:
-      (fname, ftype, fid) = iprot.readFieldBegin()
-      if ftype == TType.STOP:
-        break
-      if fid == 1:
-        if ftype == TType.STRING:
-          self.functionName = iprot.readString().decode('utf-8')
-        else:
-          iprot.skip(ftype)
-      elif fid == 2:
-        if ftype == TType.STRING:
-          self.funcArgs = iprot.readString().decode('utf-8')
-        else:
-          iprot.skip(ftype)
-      else:
-        iprot.skip(ftype)
-      iprot.readFieldEnd()
-    iprot.readStructEnd()
-
-  def write(self, oprot):
-    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
-      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
-      return
-    oprot.writeStructBegin('execute_args')
-    if self.functionName is not None:
-      oprot.writeFieldBegin('functionName', TType.STRING, 1)
-      oprot.writeString(self.functionName.encode('utf-8'))
-      oprot.writeFieldEnd()
-    if self.funcArgs is not None:
-      oprot.writeFieldBegin('funcArgs', TType.STRING, 2)
-      oprot.writeString(self.funcArgs.encode('utf-8'))
-      oprot.writeFieldEnd()
-    oprot.writeFieldStop()
-    oprot.writeStructEnd()
-
-  def validate(self):
-    return
-
-
-  def __repr__(self):
-    L = ['%s=%r' % (key, value)
-      for key, value in self.__dict__.iteritems()]
-    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
-  def __eq__(self, other):
-    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
-  def __ne__(self, other):
-    return not (self == other)
-
-class execute_result:
-  """
-  Attributes:
-   - success
-   - e
-  """
-
-  thrift_spec = (
-    (0, TType.STRING, 'success', None, None, ), # 0
-    (1, TType.STRUCT, 'e', (DRPCExecutionException, DRPCExecutionException.thrift_spec), None, ), # 1
-  )
-
-  def __hash__(self):
-    return 0 + hash(self.success) + hash(self.e)
-
-  def __init__(self, success=None, e=None,):
-    self.success = success
-    self.e = e
-
-  def read(self, iprot):
-    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
-      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
-      return
-    iprot.readStructBegin()
-    while True:
-      (fname, ftype, fid) = iprot.readFieldBegin()
-      if ftype == TType.STOP:
-        break
-      if fid == 0:
-        if ftype == TType.STRING:
-          self.success = iprot.readString().decode('utf-8')
-        else:
-          iprot.skip(ftype)
-      elif fid == 1:
-        if ftype == TType.STRUCT:
-          self.e = DRPCExecutionException()
-          self.e.read(iprot)
-        else:
-          iprot.skip(ftype)
-      else:
-        iprot.skip(ftype)
-      iprot.readFieldEnd()
-    iprot.readStructEnd()
-
-  def write(self, oprot):
-    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
-      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
-      return
-    oprot.writeStructBegin('execute_result')
-    if self.success is not None:
-      oprot.writeFieldBegin('success', TType.STRING, 0)
-      oprot.writeString(self.success.encode('utf-8'))
-      oprot.writeFieldEnd()
-    if self.e is not None:
-      oprot.writeFieldBegin('e', TType.STRUCT, 1)
-      self.e.write(oprot)
-      oprot.writeFieldEnd()
-    oprot.writeFieldStop()
-    oprot.writeStructEnd()
-
-  def validate(self):
-    return
-
-
-  def __repr__(self):
-    L = ['%s=%r' % (key, value)
-      for key, value in self.__dict__.iteritems()]
-    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
-  def __eq__(self, other):
-    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
-  def __ne__(self, other):
-    return not (self == other)

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote b/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote
deleted file mode 100644
index 5235dfe..0000000
--- a/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python
-#
-# Autogenerated by Thrift Compiler (0.7.0)
-#
-# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
-#
-
-import sys
-import pprint
-from urlparse import urlparse
-from thrift.transport import TTransport
-from thrift.transport import TSocket
-from thrift.transport import THttpClient
-from thrift.protocol import TBinaryProtocol
-
-import DistributedRPCInvocations
-from ttypes import *
-
-if len(sys.argv) <= 1 or sys.argv[1] == '--help':
-  print ''
-  print 'Usage: ' + sys.argv[0] + ' [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
-  print ''
-  print 'Functions:'
-  print '  void result(string id, string result)'
-  print '  DRPCRequest fetchRequest(string functionName)'
-  print '  void failRequest(string id)'
-  print ''
-  sys.exit(0)
-
-pp = pprint.PrettyPrinter(indent = 2)
-host = 'localhost'
-port = 9090
-uri = ''
-framed = False
-http = False
-argi = 1
-
-if sys.argv[argi] == '-h':
-  parts = sys.argv[argi+1].split(':')
-  host = parts[0]
-  port = int(parts[1])
-  argi += 2
-
-if sys.argv[argi] == '-u':
-  url = urlparse(sys.argv[argi+1])
-  parts = url[1].split(':')
-  host = parts[0]
-  if len(parts) > 1:
-    port = int(parts[1])
-  else:
-    port = 80
-  uri = url[2]
-  if url[4]:
-    uri += '?%s' % url[4]
-  http = True
-  argi += 2
-
-if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
-  framed = True
-  argi += 1
-
-cmd = sys.argv[argi]
-args = sys.argv[argi+1:]
-
-if http:
-  transport = THttpClient.THttpClient(host, port, uri)
-else:
-  socket = TSocket.TSocket(host, port)
-  if framed:
-    transport = TTransport.TFramedTransport(socket)
-  else:
-    transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = DistributedRPCInvocations.Client(protocol)
-transport.open()
-
-if cmd == 'result':
-  if len(args) != 2:
-    print 'result requires 2 args'
-    sys.exit(1)
-  pp.pprint(client.result(args[0],args[1],))
-
-elif cmd == 'fetchRequest':
-  if len(args) != 1:
-    print 'fetchRequest requires 1 args'
-    sys.exit(1)
-  pp.pprint(client.fetchRequest(args[0],))
-
-elif cmd == 'failRequest':
-  if len(args) != 1:
-    print 'failRequest requires 1 args'
-    sys.exit(1)
-  pp.pprint(client.failRequest(args[0],))
-
-else:
-  print 'Unrecognized method %s' % cmd
-  sys.exit(1)
-
-transport.close()