You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:00 UTC

[21/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/TridentTopology.java b/jstorm-client/src/main/java/storm/trident/TridentTopology.java
deleted file mode 100644
index 7b4b00d..0000000
--- a/jstorm-client/src/main/java/storm/trident/TridentTopology.java
+++ /dev/null
@@ -1,796 +0,0 @@
-package storm.trident;
-
-import backtype.storm.Config;
-import backtype.storm.ILocalDRPC;
-import backtype.storm.drpc.DRPCSpout;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import org.jgrapht.DirectedGraph;
-import org.jgrapht.UndirectedGraph;
-import org.jgrapht.alg.ConnectivityInspector;
-import org.jgrapht.graph.DefaultDirectedGraph;
-import org.jgrapht.graph.Pseudograph;
-import storm.trident.drpc.ReturnResultsReducer;
-import storm.trident.fluent.GroupedStream;
-import storm.trident.fluent.IAggregatableStream;
-import storm.trident.fluent.UniqueIdGen;
-import storm.trident.graph.GraphGrouper;
-import storm.trident.graph.Group;
-import storm.trident.operation.GroupedMultiReducer;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.impl.FilterExecutor;
-import storm.trident.operation.impl.GroupedMultiReducerExecutor;
-import storm.trident.operation.impl.IdentityMultiReducer;
-import storm.trident.operation.impl.JoinerMultiReducer;
-import storm.trident.operation.impl.TrueFilter;
-import storm.trident.partition.IdentityGrouping;
-import storm.trident.planner.Node;
-import storm.trident.planner.NodeStateInfo;
-import storm.trident.planner.PartitionNode;
-import storm.trident.planner.ProcessorNode;
-import storm.trident.planner.SpoutNode;
-import storm.trident.planner.SubtopologyBolt;
-import storm.trident.planner.processor.EachProcessor;
-import storm.trident.planner.processor.MultiReducerProcessor;
-import storm.trident.spout.BatchSpoutExecutor;
-import storm.trident.spout.IBatchSpout;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-import storm.trident.spout.IPartitionedTridentSpout;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor;
-import storm.trident.spout.PartitionedTridentSpoutExecutor;
-import storm.trident.spout.RichSpoutBatchExecutor;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateSpec;
-import storm.trident.topology.TridentTopologyBuilder;
-import storm.trident.util.ErrorEdgeFactory;
-import storm.trident.util.IndexedEdge;
-import storm.trident.util.TridentUtils;
-
-
-// graph with 3 kinds of nodes:
-// operation, partition, or spout
-// all operations have finishBatch and can optionally be committers
-public class TridentTopology {
-    
-    //TODO: add a method for drpc stream, needs to know how to automatically do returnresults, etc
-    // is it too expensive to do a batch per drpc request?
-    
-    DefaultDirectedGraph<Node, IndexedEdge> _graph;
-    Map<String, List<Node>> _colocate = new HashMap();
-    UniqueIdGen _gen;
-    
-    public TridentTopology() {
-        _graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
-        _gen = new UniqueIdGen();
-    }
-    
-    private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
-        _graph = graph;
-        _colocate = colocate;
-        _gen = gen;
-    }
-    
-    
-    // automatically turn it into a batch spout, should take parameters as to how much to batch
-//    public Stream newStream(IRichSpout spout) {
-//        Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.BATCH);
-//        return addNode(n);
-//    }
-    
-     public Stream newStream(String txId, IRichSpout spout) {
-        return newStream(txId, new RichSpoutBatchExecutor(spout));
-    }
-    
-    public Stream newStream(String txId, IBatchSpout spout) {
-        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
-        return addNode(n);
-    }
-    
-    public Stream newStream(String txId, ITridentSpout spout) {
-        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
-        return addNode(n);
-    }
-    
-    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
-        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
-    }
-    
-    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
-        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
-    }
-    
-    public Stream newDRPCStream(String function) {
-        return newDRPCStream(new DRPCSpout(function));
-    }
-
-    public Stream newDRPCStream(String function, ILocalDRPC server) {
-        DRPCSpout spout;
-        if(server==null) {
-            spout = new DRPCSpout(function);
-        } else {
-            spout = new DRPCSpout(function, server);
-        }
-        return newDRPCStream(spout);
-    }
-    
-    private Stream newDRPCStream(DRPCSpout spout) {
-        // TODO: consider adding a shuffle grouping after the spout to avoid so much routing of the args/return-info all over the place
-        // (at least until its possible to just pack bolt logic into the spout itself)
-
-        Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.DRPC);
-        Stream nextStream = addNode(n);
-        // later on, this will be joined back with return-info and all the results
-        return nextStream.project(new Fields("args"));
-    }
-    
-    public TridentState newStaticState(StateFactory factory) {
-        return newStaticState(new StateSpec(factory));
-    }
-    
-    public TridentState newStaticState(StateSpec spec) {
-        String stateId = getUniqueStateId();
-        Node n = new Node(getUniqueStreamId(), null, new Fields());
-        n.stateInfo = new NodeStateInfo(stateId, spec);
-        registerNode(n);
-        return new TridentState(this, n);
-    }
-    
-    public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
-        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
-    }
-
-    public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
-        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
-    }    
-    
-    public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
-        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
-    }
-    
-    public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
-        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
-    } 
-    
-    public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
-        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
-    }
-        
-    public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
-        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);        
-    }    
-    
-    public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
-        List<String> names = new ArrayList<String>();
-        for(Stream s: streams) {
-            if(s._name!=null) {
-                names.add(s._name);
-            }
-        }
-        Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
-        return addSourcedNode(streams, n);
-    }
-    
-    public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
-        List<Fields> fullInputFields = new ArrayList<Fields>();
-        List<Stream> streams = new ArrayList<Stream>();
-        List<Fields> fullGroupFields = new ArrayList<Fields>();
-        for(int i=0; i<groupedStreams.size(); i++) {
-            GroupedStream gs = groupedStreams.get(i);
-            Fields groupFields = gs.getGroupFields();
-            fullGroupFields.add(groupFields);
-            streams.add(gs.toStream().partitionBy(groupFields));
-            fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
-            
-        }
-        return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
-    }
-    
-    public Stream merge(Fields outputFields, Stream... streams) {
-        return merge(outputFields, Arrays.asList(streams));
-    }
-    
-    public Stream merge(Fields outputFields, List<Stream> streams) {
-        return multiReduce(streams, new IdentityMultiReducer(), outputFields);
-    }
-    
-    public Stream merge(Stream... streams) {
-        return merge(Arrays.asList(streams));
-    }
-    
-    public Stream merge(List<Stream> streams) {
-        return merge(streams.get(0).getOutputFields(), streams);
-    } 
-    
-    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
-        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);        
-    }
-    
-    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
-        return join(streams, joinFields, outFields, JoinType.INNER);        
-    }
-
-    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
-        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);        
-    }
-    
-    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
-        return join(streams, joinFields, outFields, repeat(streams.size(), type));        
-    }
-
-    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
-        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);        
-        
-    }
-    
-    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
-        return multiReduce(strippedInputFields(streams, joinFields),
-              groupedStreams(streams, joinFields),
-              new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
-              outFields);
-    }    
-        
-    public StormTopology build() {
-        DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
-        
-        
-        completeDRPC(graph, _colocate, _gen);
-        
-        List<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
-        
-        // can be regular nodes (static state) or processor nodes
-        Set<Node> boltNodes = new HashSet<Node>();
-        for(Node n: graph.vertexSet()) {
-            if(n instanceof SpoutNode) {
-                spoutNodes.add((SpoutNode) n);
-            } else if(!(n instanceof PartitionNode)) {
-                boltNodes.add(n);
-            }
-        }
-        
-        
-        Set<Group> initialGroups = new HashSet<Group>();
-        for(List<Node> colocate: _colocate.values()) {
-            Group g = new Group(graph, colocate);
-            boltNodes.removeAll(colocate);
-            initialGroups.add(g);
-        }
-        for(Node n: boltNodes) {
-            initialGroups.add(new Group(graph, n));
-        }
-        
-        
-        GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
-        grouper.mergeFully();
-        Collection<Group> mergedGroups = grouper.getAllGroups();
-        
-        
-        
-        // add identity partitions between groups
-        for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
-            if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                
-                Group g1 = grouper.nodeGroup(e.source);
-                Group g2 = grouper.nodeGroup(e.target);
-                // g1 being null means the source is a spout node
-                if(g1==null && !(e.source instanceof SpoutNode))
-                    throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
-                if(g1==null || !g1.equals(g2)) {
-                    graph.removeEdge(e);
-                    PartitionNode pNode = makeIdentityPartition(e.source);
-                    graph.addVertex(pNode);
-                    graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
-                    graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    
-                }
-            }
-        }
-        // if one group subscribes to the same stream with same partitioning multiple times,
-        // merge those together (otherwise can end up with many output streams created for that partitioning
-        // if need to split into multiple output streams because of same input having different
-        // partitioning to the group)
-        
-        // this is because can't currently merge splitting logic into a spout
-        // not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it 
-        // works out
-        List<Node> forNewGroups = new ArrayList<Node>();
-        for(Group g: mergedGroups) {
-            for(PartitionNode n: extraPartitionInputs(g)) {
-                Node idNode = makeIdentityNode(n.allOutputFields);
-                Node newPartitionNode = new PartitionNode(idNode.streamId, n.name, idNode.allOutputFields, n.thriftGrouping);
-                Node parentNode = TridentUtils.getParent(graph, n);
-                Set<IndexedEdge> outgoing = graph.outgoingEdgesOf(n);
-                graph.removeVertex(n);
-                
-                graph.addVertex(idNode);
-                graph.addVertex(newPartitionNode);
-                addEdge(graph, parentNode, idNode, 0);
-                addEdge(graph, idNode, newPartitionNode, 0);
-                for(IndexedEdge e: outgoing) {
-                    addEdge(graph, newPartitionNode, e.target, e.index);
-                }
-                Group parentGroup = grouper.nodeGroup(parentNode);
-                if(parentGroup==null) {
-                    forNewGroups.add(idNode);
-                } else {
-                    parentGroup.nodes.add(idNode);
-                }
-            }
-        }
-        // TODO: in the future, want a way to include this logic in the spout itself,
-        // or make it unecessary by having storm include metadata about which grouping a tuple
-        // came from
-        
-        for(Node n: forNewGroups) {
-            grouper.addGroup(new Group(graph, n));
-        }
-        
-        // add in spouts as groups so we can get parallelisms
-        for(Node n: spoutNodes) {
-            grouper.addGroup(new Group(graph, n));
-        }
-        
-        grouper.reindex();
-        mergedGroups = grouper.getAllGroups();
-                
-        
-        Map<Node, String> batchGroupMap = new HashMap();
-        List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
-        for(int i=0; i<connectedComponents.size(); i++) {
-            String groupId = "bg" + i;
-            for(Node n: connectedComponents.get(i)) {
-                batchGroupMap.put(n, groupId);
-            }
-        }
-        
-//        System.out.println("GRAPH:");
-//        System.out.println(graph);
-        
-        Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
-        
-        TridentTopologyBuilder builder = new TridentTopologyBuilder();
-        
-        Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
-        Map<Group, String> boltIds = genBoltIds(mergedGroups);
-        
-        for(SpoutNode sn: spoutNodes) {
-            Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
-            if(sn.type == SpoutNode.SpoutType.DRPC) {
-                builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
-                        (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
-            } else {
-                ITridentSpout s;
-                if(sn.spout instanceof IBatchSpout) {
-                    s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
-                } else if(sn.spout instanceof ITridentSpout) {
-                    s = (ITridentSpout) sn.spout;
-                } else {
-                    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
-                    // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
-                }
-                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
-            }
-        }
-        
-        for(Group g: mergedGroups) {
-            if(!isSpoutGroup(g)) {
-                Integer p = parallelisms.get(g);
-                Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
-                BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
-                        committerBatches(g, batchGroupMap), streamToGroup);
-                Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
-                for(PartitionNode n: inputs) {
-                    Node parent = TridentUtils.getParent(graph, n);
-                    String componentId;
-                    if(parent instanceof SpoutNode) {
-                        componentId = spoutIds.get(parent);
-                    } else {
-                        componentId = boltIds.get(grouper.nodeGroup(parent));
-                    }
-                    d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
-                } 
-            }
-        }
-        
-        return builder.buildTopology();
-    }
-    
-    private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
-        List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
-        
-        for(Set<Node> g: connectedComponents) {
-            checkValidJoins(g);
-        }
-        
-        TridentTopology helper = new TridentTopology(graph, colocate, gen);
-        for(Set<Node> g: connectedComponents) {
-            SpoutNode drpcNode = getDRPCSpoutNode(g);
-            if(drpcNode!=null) {
-                Stream lastStream = new Stream(helper, null, getLastAddedNode(g));
-                Stream s = new Stream(helper, null, drpcNode);
-                helper.multiReduce(
-                        s.project(new Fields("return-info"))
-                         .batchGlobal(),
-                        lastStream.batchGlobal(),
-                        new ReturnResultsReducer(),
-                        new Fields());
-            }
-        }                
-    }
-    
-    private static Node getLastAddedNode(Collection<Node> g) {
-        Node ret = null;
-        for(Node n: g) {
-            if(ret==null || n.creationIndex > ret.creationIndex) {
-                ret = n;
-            }
-        }
-        return ret;
-    }
-    
-    //returns null if it's not a drpc group
-    private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
-        for(Node n: g) {
-            if(n instanceof SpoutNode) {
-                SpoutNode.SpoutType type = ((SpoutNode) n).type;
-                if(type==SpoutNode.SpoutType.DRPC) {
-                    return (SpoutNode) n;
-                }
-            }
-        }
-        return null;
-    }
-    
-    private static void checkValidJoins(Collection<Node> g) {
-        boolean hasDRPCSpout = false;
-        boolean hasBatchSpout = false;
-        for(Node n: g) {
-            if(n instanceof SpoutNode) {
-                SpoutNode.SpoutType type = ((SpoutNode) n).type;
-                if(type==SpoutNode.SpoutType.BATCH) {
-                    hasBatchSpout = true;
-                } else if(type==SpoutNode.SpoutType.DRPC) {
-                    hasDRPCSpout = true;
-                }
-            }
-        }
-        if(hasBatchSpout && hasDRPCSpout) {
-            throw new RuntimeException("Cannot join DRPC stream with streams originating from other spouts");
-        }
-    }
-    
-    private static boolean isSpoutGroup(Group g) {
-        return g.nodes.size() == 1 && g.nodes.iterator().next() instanceof SpoutNode;
-    }
-    
-    private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
-        Map<String, PartitionNode> ret = new HashMap();
-        for(PartitionNode n: subscriptions) {
-            PartitionNode curr = ret.get(n.streamId);
-            if(curr!=null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
-                throw new RuntimeException("Multiple subscriptions to the same stream with different groupings. Should be impossible since that is explicitly guarded against.");
-            }
-            ret.put(n.streamId, n);
-        }
-        return ret.values();
-    }
-    
-    private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
-        Map<Node, String> ret = new HashMap();
-        int ctr = 0;
-        for(SpoutNode n: spoutNodes) {
-            ret.put(n, "spout" + ctr);
-            ctr++;
-        }
-        return ret;
-    }
-
-    private static Map<Group, String> genBoltIds(Collection<Group> groups) {
-        Map<Group, String> ret = new HashMap();
-        int ctr = 0;
-        for(Group g: groups) {
-            if(!isSpoutGroup(g)) {
-                List<String> name = new ArrayList();
-                name.add("b");
-                name.add("" + ctr);
-                String groupName = getGroupName(g);
-                if(groupName!=null && !groupName.isEmpty()) {
-                    name.add(getGroupName(g));                
-                }
-                ret.put(g, Utils.join(name, "-"));
-                ctr++;
-            }
-        }
-        return ret;
-    }
-    
-    private static String getGroupName(Group g) {
-        TreeMap<Integer, String> sortedNames = new TreeMap();
-        for(Node n: g.nodes) {
-            if(n.name!=null) {
-                sortedNames.put(n.creationIndex, n.name);
-            }
-        }
-        List<String> names = new ArrayList<String>();
-        String prevName = null;
-        for(String n: sortedNames.values()) {
-            if(prevName==null || !n.equals(prevName)) {
-                prevName = n;
-                names.add(n);
-            }
-        }
-        return Utils.join(names, "-");
-    }
-    
-    private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
-        Map<String, String> ret = new HashMap();
-        Set<PartitionNode> externalGroupOutputs = externalGroupOutputs(g);
-        for(PartitionNode n: externalGroupOutputs) {
-            ret.put(n.streamId, batchGroupMap.get(n));
-        }        
-        return ret;
-    }
-    
-    private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
-        Set<String> ret = new HashSet();
-        for(Node n: g.nodes) {
-           if(n instanceof ProcessorNode) {
-               if(((ProcessorNode) n).committer) {
-                   ret.add(batchGroupMap.get(n));
-               }
-           } 
-        }
-        return ret;
-    }
-    
-    private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
-        UndirectedGraph<Group, Object> equivs = new Pseudograph<Group, Object>(Object.class);
-        for(Group g: groups) {
-            equivs.addVertex(g);
-        }
-        for(Group g: groups) {
-            for(PartitionNode n: externalGroupInputs(g)) {
-                if(isIdentityPartition(n)) {
-                    Node parent = TridentUtils.getParent(graph, n);
-                    Group parentGroup = grouper.nodeGroup(parent);
-                    if(parentGroup!=null && !parentGroup.equals(g)) {
-                        equivs.addEdge(parentGroup, g);
-                    }
-                }
-            }            
-        }
-        
-        Map<Group, Integer> ret = new HashMap();
-        List<Set<Group>> equivGroups = new ConnectivityInspector<Group, Object>(equivs).connectedSets();
-        for(Set<Group> equivGroup: equivGroups) {
-            Integer fixedP = getFixedParallelism(equivGroup);
-            Integer maxP = getMaxParallelism(equivGroup);
-            if(fixedP!=null && maxP!=null && maxP < fixedP) {
-                throw new RuntimeException("Parallelism is fixed to " + fixedP + " but max parallelism is less than that: " + maxP);
-            }
-            
-            
-            Integer p = 1;
-            for(Group g: equivGroup) {
-                for(Node n: g.nodes) {
-                    if(n.parallelismHint!=null) {
-                        p = Math.max(p, n.parallelismHint);
-                    }
-                }
-            }
-            if(maxP!=null) p = Math.min(maxP, p);
-            
-            if(fixedP!=null) p = fixedP;
-            for(Group g: equivGroup) {
-                ret.put(g, p);
-            }
-        }
-        return ret;
-    }
-    
-    private static Integer getMaxParallelism(Set<Group> groups) {
-        Integer ret = null;
-        for(Group g: groups) {
-            if(isSpoutGroup(g)) {
-                SpoutNode n = (SpoutNode) g.nodes.iterator().next();
-                Map conf = getSpoutComponentConfig(n.spout);
-                if(conf==null) conf = new HashMap();
-                Number maxP = (Number) conf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
-                if(maxP!=null) {
-                    if(ret==null) ret = maxP.intValue();
-                    else ret = Math.min(ret, maxP.intValue());
-                }
-            }
-        }
-        return ret;
-    }
-    
-    private static Map getSpoutComponentConfig(Object spout) {
-        if(spout instanceof IRichSpout) {
-            return ((IRichSpout) spout).getComponentConfiguration();
-        } else if (spout instanceof IBatchSpout) {
-            return ((IBatchSpout) spout).getComponentConfiguration();
-        } else {
-            return ((ITridentSpout) spout).getComponentConfiguration();
-        }
-    }
-    
-    private static Integer getFixedParallelism(Set<Group> groups) {
-        Integer ret = null;
-        for(Group g: groups) {
-            for(Node n: g.nodes) {
-                if(n.stateInfo != null && n.stateInfo.spec.requiredNumPartitions!=null) {
-                    int reqPartitions = n.stateInfo.spec.requiredNumPartitions;
-                    if(ret!=null && ret!=reqPartitions) {
-                        throw new RuntimeException("Cannot have one group have fixed parallelism of two different values");
-                    }
-                    ret = reqPartitions;
-                }
-            }
-        }
-        return ret;
-    }
-    
-    private static boolean isIdentityPartition(PartitionNode n) {
-        Grouping g = n.thriftGrouping;
-        if(g.is_set_custom_serialized()) {
-            CustomStreamGrouping csg = (CustomStreamGrouping) Utils.deserialize(g.get_custom_serialized());
-            return csg instanceof IdentityGrouping;
-        }
-        return false;
-    }
-    
-    private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
-        g.addEdge(source, target, new IndexedEdge(source, target, index));
-    }
-    
-    private Node makeIdentityNode(Fields allOutputFields) {
-        return new ProcessorNode(getUniqueStreamId(), null, allOutputFields, new Fields(),
-                new EachProcessor(new Fields(), new FilterExecutor(new TrueFilter())));
-    }
-    
-    private static List<PartitionNode> extraPartitionInputs(Group g) {
-        List<PartitionNode> ret = new ArrayList();
-        Set<PartitionNode> inputs = externalGroupInputs(g);
-        Map<String, List<PartitionNode>> grouped = new HashMap();
-        for(PartitionNode n: inputs) {
-            if(!grouped.containsKey(n.streamId)) {
-                grouped.put(n.streamId, new ArrayList());
-            }
-            grouped.get(n.streamId).add(n);
-        }
-        for(List<PartitionNode> group: grouped.values()) {
-            PartitionNode anchor = group.get(0);
-            for(int i=1; i<group.size(); i++) {
-                PartitionNode n = group.get(i);
-                if(!n.thriftGrouping.equals(anchor.thriftGrouping)) {
-                    ret.add(n);
-                }
-            }
-        }
-        return ret;
-    }
-    
-    private static Set<PartitionNode> externalGroupInputs(Group g) {
-        Set<PartitionNode> ret = new HashSet();
-        for(Node n: g.incomingNodes()) {
-            if(n instanceof PartitionNode) {
-                ret.add((PartitionNode) n);
-            }
-        }
-        return ret;
-    }
-    
-    private static Set<PartitionNode> externalGroupOutputs(Group g) {
-        Set<PartitionNode> ret = new HashSet();
-        for(Node n: g.outgoingNodes()) {
-            if(n instanceof PartitionNode) {
-                ret.add((PartitionNode) n);
-            }
-        }
-        return ret;
-    }    
-    
-    private static PartitionNode makeIdentityPartition(Node basis) {
-        return new PartitionNode(basis.streamId, basis.name, basis.allOutputFields,
-            Grouping.custom_serialized(Utils.serialize(new IdentityGrouping())));
-    }
-    
-    
-    protected String getUniqueStreamId() {
-        return _gen.getUniqueStreamId();
-    }
-
-    protected String getUniqueStateId() {
-        return _gen.getUniqueStateId();
-    }
-    
-    protected void registerNode(Node n) {
-        _graph.addVertex(n);
-        if(n.stateInfo!=null) {
-            String id = n.stateInfo.id;
-            if(!_colocate.containsKey(id)) {
-                _colocate.put(id, new ArrayList());
-            }
-            _colocate.get(id).add(n);
-        }
-    }
-    
-    protected Stream addNode(Node n) {
-        registerNode(n);
-        return new Stream(this, n.name, n);
-    }
-
-    protected void registerSourcedNode(List<Stream> sources, Node newNode) {
-        registerNode(newNode);
-        int streamIndex = 0;
-        for(Stream s: sources) {
-            _graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
-            streamIndex++;
-        }        
-    }
-    
-    protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
-        registerSourcedNode(sources, newNode);
-        return new Stream(this, newNode.name, newNode);
-    }
-    
-    protected TridentState addSourcedStateNode(List<Stream> sources, Node newNode) {
-        registerSourcedNode(sources, newNode);
-        return new TridentState(this, newNode);
-    }    
-    
-    protected Stream addSourcedNode(Stream source, Node newNode) {
-        return addSourcedNode(Arrays.asList(source), newNode);
-    }
-
-    protected TridentState addSourcedStateNode(Stream source, Node newNode) {
-        return addSourcedStateNode(Arrays.asList(source), newNode);
-    }       
-    
-    private static List<Fields> getAllOutputFields(List streams) {
-        List<Fields> ret = new ArrayList<Fields>();
-        for(Object o: streams) {
-            ret.add(((IAggregatableStream) o).getOutputFields());
-        }
-        return ret;
-    }
-    
-    
-    private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
-        List<GroupedStream> ret = new ArrayList<GroupedStream>();
-        for(int i=0; i<streams.size(); i++) {
-            ret.add(streams.get(i).groupBy(joinFields.get(i)));
-        }
-        return ret;
-    }
-    
-    private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
-        List<Fields> ret = new ArrayList<Fields>();
-        for(int i=0; i<streams.size(); i++) {
-            ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
-        }
-        return ret;
-    }
-    
-    private static List<JoinType> repeat(int n, JoinType type) {
-        List<JoinType> ret = new ArrayList<JoinType>();
-        for(int i=0; i<n; i++) {
-            ret.add(type);
-        }
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java b/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java
deleted file mode 100644
index e89719e..0000000
--- a/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package storm.trident.drpc;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.thrift7.TException;
-
-import storm.trident.drpc.ReturnResultsReducer.ReturnResultsState;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.tuple.TridentTuple;
-import backtype.storm.Config;
-import backtype.storm.drpc.DRPCInvocationsClient;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.utils.ServiceRegistry;
-import backtype.storm.utils.Utils;
-
-
-public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
-    public static class ReturnResultsState {
-        List<TridentTuple> results = new ArrayList<TridentTuple>();
-        String returnInfo;
-
-        @Override
-        public String toString() {
-            return ToStringBuilder.reflectionToString(this);
-        }
-    }
-    boolean local;
-
-    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
-    
-    
-    @Override
-    public void prepare(Map conf, TridentMultiReducerContext context) {
-        local = conf.get(Config.STORM_CLUSTER_MODE).equals("local");
-    }
-
-    @Override
-    public ReturnResultsState init(TridentCollector collector) {
-        return new ReturnResultsState();
-    }
-
-    @Override
-    public void execute(ReturnResultsState state, int streamIndex, TridentTuple input, TridentCollector collector) {
-        if(streamIndex==0) {
-            state.returnInfo = input.getString(0);
-        } else {
-            state.results.add(input);
-        }
-    }
-
-    @Override
-    public void complete(ReturnResultsState state, TridentCollector collector) {
-        // only one of the multireducers will receive the tuples
-        if(state.returnInfo!=null) {
-            String result = Utils.to_json(state.results);
-            Map retMap = (Map) Utils.from_json(state.returnInfo);
-            final String host = (String) retMap.get("host");
-            final int port = Utils.getInt(retMap.get("port"));
-            String id = (String) retMap.get("id");
-            DistributedRPCInvocations.Iface client;
-            if(local) {
-                client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
-            } else {
-                List server = new ArrayList() {{
-                    add(host);
-                    add(port);
-                }};
-
-                if(!_clients.containsKey(server)) {
-                    _clients.put(server, new DRPCInvocationsClient(host, port));
-                }
-                client = _clients.get(server);
-            }
-
-            try {
-                client.result(id, result);
-            } catch(TException e) {
-                collector.reportError(e);
-            }
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        for(DRPCInvocationsClient c: _clients.values()) {
-            c.close();
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java
deleted file mode 100644
index de8fe9c..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import storm.trident.Stream;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.impl.ChainedAggregatorImpl;
-import storm.trident.operation.impl.CombinerAggregatorCombineImpl;
-import storm.trident.operation.impl.CombinerAggregatorInitImpl;
-import storm.trident.operation.impl.ReducerAggregatorImpl;
-import storm.trident.operation.impl.SingleEmitAggregator;
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-import storm.trident.tuple.ComboList;
-
-
-public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer, ChainedPartitionAggregatorDeclarer {    
-    public static interface AggregationPartition {
-        Stream partition(Stream input);
-    }
-    
-    private static enum AggType {
-        PARTITION,
-        FULL,
-        FULL_COMBINE
-    }
-    
-    // inputFields can be equal to outFields, but multiple aggregators cannot have intersection outFields
-    private static class AggSpec {
-        Fields inFields;
-        Aggregator agg;
-        Fields outFields;
-        
-        public AggSpec(Fields inFields, Aggregator agg, Fields outFields) {
-            this.inFields = inFields;
-            this.agg = agg;
-            this.outFields = outFields;
-        }
-    }
-    
-    List<AggSpec> _aggs = new ArrayList<AggSpec>();
-    IAggregatableStream _stream;
-    AggType _type = null;
-    GlobalAggregationScheme _globalScheme;
-    
-    public ChainedAggregatorDeclarer(IAggregatableStream stream, GlobalAggregationScheme globalScheme) {
-        _stream = stream;
-        _globalScheme = globalScheme;
-    }
-    
-    public Stream chainEnd() {
-        Fields[] inputFields = new Fields[_aggs.size()];
-        Aggregator[] aggs = new Aggregator[_aggs.size()];
-        int[] outSizes = new int[_aggs.size()];
-        List<String> allOutFields = new ArrayList<String>();
-        Set<String> allInFields = new HashSet<String>();
-        for(int i=0; i<_aggs.size(); i++) {
-            AggSpec spec = _aggs.get(i);
-            Fields infields = spec.inFields;
-            if(infields==null) infields = new Fields();
-            Fields outfields = spec.outFields;
-            if(outfields==null) outfields = new Fields();
-
-            inputFields[i] = infields;
-            aggs[i] = spec.agg;
-            outSizes[i] = outfields.size();  
-            allOutFields.addAll(outfields.toList());
-            allInFields.addAll(infields.toList());
-        }
-        if(new HashSet(allOutFields).size() != allOutFields.size()) {
-            throw new IllegalArgumentException("Output fields for chained aggregators must be distinct: " + allOutFields.toString());
-        }
-        
-        Fields inFields = new Fields(new ArrayList<String>(allInFields));
-        Fields outFields = new Fields(allOutFields);
-        Aggregator combined = new ChainedAggregatorImpl(aggs, inputFields, new ComboList.Factory(outSizes));
-        
-        if(_type!=AggType.FULL) {
-            _stream = _stream.partitionAggregate(inFields, combined, outFields);
-        }
-        if(_type!=AggType.PARTITION) {
-            _stream = _globalScheme.aggPartition(_stream);
-            BatchToPartition singleEmit = _globalScheme.singleEmitPartitioner();
-            Aggregator toAgg = combined;
-            if(singleEmit!=null) {
-                toAgg = new SingleEmitAggregator(combined, singleEmit);
-            }
-            // this assumes that inFields and outFields are the same for combineragg
-            // assumption also made above
-            _stream = _stream.partitionAggregate(inFields, toAgg, outFields);
-        }
-        return _stream.toStream();
-    }
-
-    public ChainedPartitionAggregatorDeclarer partitionAggregate(Aggregator agg, Fields functionFields) {
-        return partitionAggregate(null, agg, functionFields);
-    }
-
-    public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
-        _type = AggType.PARTITION;
-        _aggs.add(new AggSpec(inputFields, agg, functionFields));
-        return this;
-    }
-
-    public ChainedPartitionAggregatorDeclarer partitionAggregate(CombinerAggregator agg, Fields functionFields) {
-        return partitionAggregate(null, agg, functionFields);
-    }
-
-    public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
-        initCombiner(inputFields, agg, functionFields);
-        return partitionAggregate(functionFields, new CombinerAggregatorCombineImpl(agg), functionFields);
-    }  
-    
-    public ChainedPartitionAggregatorDeclarer partitionAggregate(ReducerAggregator agg, Fields functionFields) {
-        return partitionAggregate(null, agg, functionFields);
-    }
-
-    public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
-        return partitionAggregate(inputFields, new ReducerAggregatorImpl(agg), functionFields);
-    }  
-    
-    public ChainedFullAggregatorDeclarer aggregate(Aggregator agg, Fields functionFields) {
-        return aggregate(null, agg, functionFields);
-    }
-    
-    public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
-        return aggregate(inputFields, agg, functionFields, false);
-    }
-    
-    private ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields, boolean isCombiner) {
-        if(isCombiner) {
-            if(_type == null) {
-                _type = AggType.FULL_COMBINE;            
-            }
-        } else {
-            _type = AggType.FULL;
-        }
-        _aggs.add(new AggSpec(inputFields, agg, functionFields));
-        return this;
-    }
-
-    public ChainedFullAggregatorDeclarer aggregate(CombinerAggregator agg, Fields functionFields) {
-        return aggregate(null, agg, functionFields);
-    }
-
-    public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
-        initCombiner(inputFields, agg, functionFields);
-        return aggregate(functionFields, new CombinerAggregatorCombineImpl(agg), functionFields, true);
-    }
-
-    public ChainedFullAggregatorDeclarer aggregate(ReducerAggregator agg, Fields functionFields) {
-        return aggregate(null, agg, functionFields);
-    }
-
-    public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
-        return aggregate(inputFields, new ReducerAggregatorImpl(agg), functionFields);
-    }
-    
-    private void initCombiner(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
-        _stream = _stream.each(inputFields, new CombinerAggregatorInitImpl(agg), functionFields);        
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java
deleted file mode 100644
index 84436a6..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.ReducerAggregator;
-
-public interface ChainedFullAggregatorDeclarer extends IChainedAggregatorDeclarer {
-    ChainedFullAggregatorDeclarer aggregate(Aggregator agg, Fields functionFields);
-    ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields);
-    ChainedFullAggregatorDeclarer aggregate(CombinerAggregator agg, Fields functionFields);
-    ChainedFullAggregatorDeclarer aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields);
-    ChainedFullAggregatorDeclarer aggregate(ReducerAggregator agg, Fields functionFields);
-    ChainedFullAggregatorDeclarer aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java
deleted file mode 100644
index 00e2c5a..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.ReducerAggregator;
-
-public interface ChainedPartitionAggregatorDeclarer extends IChainedAggregatorDeclarer {
-    ChainedPartitionAggregatorDeclarer partitionAggregate(Aggregator agg, Fields functionFields);
-    ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields);
-    ChainedPartitionAggregatorDeclarer partitionAggregate(CombinerAggregator agg, Fields functionFields);
-    ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields);
-    ChainedPartitionAggregatorDeclarer partitionAggregate(ReducerAggregator agg, Fields functionFields);
-    ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields); 
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java b/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java
deleted file mode 100644
index 96f15e9..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.fluent;
-
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-
-
-public interface GlobalAggregationScheme<S extends IAggregatableStream> {
-    IAggregatableStream aggPartition(S stream); // how to partition for second stage of aggregation
-    BatchToPartition singleEmitPartitioner(); // return null if it's not single emit
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java b/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java
deleted file mode 100644
index ad1e121..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.Function;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.impl.GroupedAggregator;
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-import storm.trident.state.QueryFunction;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateSpec;
-import storm.trident.state.map.MapCombinerAggStateUpdater;
-import storm.trident.state.map.MapReducerAggStateUpdater;
-import storm.trident.util.TridentUtils;
-
-
-public class GroupedStream implements IAggregatableStream, GlobalAggregationScheme<GroupedStream> {
-    Fields _groupFields;
-    Stream _stream;
-    
-    public GroupedStream(Stream stream, Fields groupFields) {
-        _groupFields = groupFields;
-        _stream = stream;
-    }
-    
-    public GroupedStream name(String name) {
-        return new GroupedStream(_stream.name(name), _groupFields);
-    }
-    
-    public ChainedAggregatorDeclarer chainedAgg() {
-        return new ChainedAggregatorDeclarer(this, this);
-    }
-    
-    public Stream aggregate(Aggregator agg, Fields functionFields) {
-        return aggregate(null, agg, functionFields);
-    }
-    
-    public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
-        return new ChainedAggregatorDeclarer(this, this)
-                .aggregate(inputFields, agg, functionFields)
-                .chainEnd();
-    }
-
-    public Stream aggregate(CombinerAggregator agg, Fields functionFields) {
-        return aggregate(null, agg, functionFields);
-    }
-
-    public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
-        return new ChainedAggregatorDeclarer(this, this)
-                .aggregate(inputFields, agg, functionFields)
-                .chainEnd();
-    }
-
-    public Stream aggregate(ReducerAggregator agg, Fields functionFields) {
-        return aggregate(null, agg, functionFields);
-    }
-
-    public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
-        return new ChainedAggregatorDeclarer(this, this)
-                .aggregate(inputFields, agg, functionFields)
-                .chainEnd();
-    }
-
-    public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
-        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
-    }
-
-    public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
-        return persistentAggregate(spec, null, agg, functionFields);
-    }
-
-    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
-        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
-    }
-
-    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
-        return aggregate(inputFields, agg, functionFields)
-                .partitionPersist(spec,
-                        TridentUtils.fieldsUnion(_groupFields, functionFields),
-                        new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),
-                        TridentUtils.fieldsConcat(_groupFields, functionFields)); 
-    }
-
-    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
-        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
-    }
-
-    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
-        return _stream.partitionBy(_groupFields)
-                .partitionPersist(spec,
-                    TridentUtils.fieldsUnion(_groupFields, inputFields),
-                    new MapReducerAggStateUpdater(agg, _groupFields, inputFields),
-                    TridentUtils.fieldsConcat(_groupFields, functionFields));
-    }
-
-    public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
-        return _stream.partitionBy(_groupFields)
-                      .stateQuery(state,
-                         inputFields,
-                         function,
-                         functionFields);
-    }    
-
-    public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
-        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
-    }
-    
-    public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
-        return persistentAggregate(spec, null, agg, functionFields);
-    }    
-    
-    public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
-        return stateQuery(state, null, function, functionFields);
-    }
-    
-    @Override
-    public IAggregatableStream each(Fields inputFields, Function function, Fields functionFields) {
-        Stream s = _stream.each(inputFields, function, functionFields);
-        return new GroupedStream(s, _groupFields);
-    }
-
-    @Override
-    public IAggregatableStream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
-        Aggregator groupedAgg = new GroupedAggregator(agg, _groupFields, inputFields, functionFields.size());
-        Fields allInFields = TridentUtils.fieldsUnion(_groupFields, inputFields);
-        Fields allOutFields = TridentUtils.fieldsConcat(_groupFields, functionFields);
-        Stream s = _stream.partitionAggregate(allInFields, groupedAgg, allOutFields);
-        return new GroupedStream(s, _groupFields);
-    }
-
-    @Override
-    public IAggregatableStream aggPartition(GroupedStream s) {
-        return new GroupedStream(s._stream.partitionBy(_groupFields), _groupFields);
-    }
-
-    @Override
-    public Stream toStream() {
-        return _stream;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _stream.getOutputFields();
-    }    
-    
-    public Fields getGroupFields() {
-        return _groupFields;
-    }
-
-    @Override
-    public BatchToPartition singleEmitPartitioner() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java b/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java
deleted file mode 100644
index e10852e..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.Stream;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.Function;
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-
-public interface IAggregatableStream {
-    IAggregatableStream each(Fields inputFields, Function function, Fields functionFields);
-    IAggregatableStream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields);
-    Stream toStream();
-    Fields getOutputFields();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java
deleted file mode 100644
index a42dfbe..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.trident.fluent;
-
-import storm.trident.Stream;
-
-public interface IChainedAggregatorDeclarer {
-    Stream chainEnd();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java b/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java
deleted file mode 100644
index 64ad621..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package storm.trident.fluent;
-
-public class UniqueIdGen {
-    int _streamCounter = 0;
-    
-    public String getUniqueStreamId() {
-        _streamCounter++;
-        return "s" + _streamCounter;
-    }
-
-    int _stateCounter = 0;
-    
-    public String getUniqueStateId() {
-        _stateCounter++;
-        return "state" + _stateCounter;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java b/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java
deleted file mode 100644
index b107269..0000000
--- a/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package storm.trident.graph;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.jgrapht.DirectedGraph;
-import storm.trident.planner.Node;
-import storm.trident.util.IndexedEdge;
-
-
-public class GraphGrouper {
-    
-    DirectedGraph<Node, IndexedEdge> graph;
-    Set<Group> currGroups;
-    Map<Node, Group> groupIndex = new HashMap();
-    
-    public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
-        this.graph = graph;
-        this.currGroups = new HashSet(initialGroups);
-        reindex();      
-    }
-    
-    public Collection<Group> getAllGroups() {
-        return currGroups;
-    }
-    
-    public void addGroup(Group g) {
-        currGroups.add(g);
-    }
-    
-    public void reindex() {
-        groupIndex.clear();
-        for(Group g: currGroups) {
-            for(Node n: g.nodes) {
-                groupIndex.put(n, g);
-            }
-        }  
-    }
-    
-    public void mergeFully() {
-        boolean somethingHappened = true;
-        while(somethingHappened) {
-            somethingHappened = false;
-            for(Group g: currGroups) {
-                Collection<Group> outgoingGroups = outgoingGroups(g);
-                if(outgoingGroups.size()==1) {
-                    Group out = outgoingGroups.iterator().next();
-                    if(out!=null) {
-                        merge(g, out);
-                        somethingHappened = true;
-                        break;
-                    }
-                }
-                
-                Collection<Group> incomingGroups = incomingGroups(g);
-                if(incomingGroups.size()==1) {
-                    Group in = incomingGroups.iterator().next();
-                    if(in!=null) {
-                        merge(g, in);
-                        somethingHappened = true;
-                        break;
-                    }
-                }                
-            }
-        }
-    }
-    
-    private void merge(Group g1, Group g2) {
-        Group newGroup = new Group(g1, g2);
-        currGroups.remove(g1);
-        currGroups.remove(g2);
-        currGroups.add(newGroup);
-        for(Node n: newGroup.nodes) {
-            groupIndex.put(n, newGroup);
-        }
-    }
-    
-    public Collection<Group> outgoingGroups(Group g) {
-        Set<Group> ret = new HashSet();
-        for(Node n: g.outgoingNodes()) {
-            Group other = nodeGroup(n);
-            if(other==null || !other.equals(g)) {
-                ret.add(other);                
-            }
-        }
-        return ret;
-    }
-    
-    public Collection<Group> incomingGroups(Group g) {
-        Set<Group> ret = new HashSet();
-        for(Node n: g.incomingNodes()) {
-            Group other = nodeGroup(n);
-            if(other==null || !other.equals(g)) {
-                ret.add(other);                
-            }
-        }
-        return ret;        
-    } 
-    
-    public Group nodeGroup(Node n) {
-        return groupIndex.get(n);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/graph/Group.java b/jstorm-client/src/main/java/storm/trident/graph/Group.java
deleted file mode 100644
index c329ad6..0000000
--- a/jstorm-client/src/main/java/storm/trident/graph/Group.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package storm.trident.graph;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.jgrapht.DirectedGraph;
-
-import storm.trident.planner.Node;
-import storm.trident.util.IndexedEdge;
-import storm.trident.util.TridentUtils;
-
-
-public class Group {
-    public Set<Node> nodes = new HashSet<Node>();
-    private DirectedGraph<Node, IndexedEdge> graph;
-    private String id;
-    
-    public Group(DirectedGraph graph, List<Node> nodes) {
-        init(graph);
-        this.nodes.addAll(nodes);
-        this.graph = graph;
-    }
-    
-    public Group(DirectedGraph graph, Node n) {
-        this(graph, Arrays.asList(n));
-    }
-    
-    public Group(Group g1, Group g2) {
-        init(g1.graph);
-        nodes.addAll(g1.nodes);
-        nodes.addAll(g2.nodes);
-    }
-    
-    private void init(DirectedGraph graph) {
-        this.graph = graph;
-        this.id = UUID.randomUUID().toString();
-    }
-    
-    public Set<Node> outgoingNodes() {
-        Set<Node> ret = new HashSet<Node>();
-        for(Node n: nodes) {
-            ret.addAll(TridentUtils.getChildren(graph, n));
-        }
-        return ret;
-    }
-    
-    public Set<Node> incomingNodes() {
-        Set<Node> ret = new HashSet<Node>();
-        for(Node n: nodes) {
-            ret.addAll(TridentUtils.getParents(graph, n));
-        }        
-        return ret;        
-    }
-
-    @Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + ((id == null) ? 0 : id.hashCode());
-		return result;
-	}
-
-    @Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		Group other = (Group) obj;
-		if (id == null) {
-			if (other.id != null)
-				return false;
-		} else if (!id.equals(other.id))
-			return false;
-		return true;
-	}
-    
-    
-
-    @Override
-    public String toString() {
-        return nodes.toString();
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java b/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java
deleted file mode 100644
index 5181703..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.tuple.TridentTuple;
-
-public interface Aggregator<T> extends Operation {
-    T init(Object batchId, TridentCollector collector);
-    void aggregate(T val, TridentTuple tuple, TridentCollector collector);
-    void complete(T val, TridentCollector collector);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Assembly.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Assembly.java b/jstorm-client/src/main/java/storm/trident/operation/Assembly.java
deleted file mode 100644
index 17aaca2..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Assembly.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.Stream;
-
-
-public interface Assembly {
-    Stream apply(Stream input);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java
deleted file mode 100644
index c97b84f..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.operation;
-
-
-public abstract class BaseAggregator<T> extends BaseOperation implements Aggregator<T> {
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java b/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java
deleted file mode 100644
index d629d0d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.operation;
-
-
-public abstract class BaseFilter extends BaseOperation implements Filter {
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java b/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java
deleted file mode 100644
index 8ff6b05..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.operation;
-
-
-public abstract class BaseFunction extends BaseOperation implements Function {
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java
deleted file mode 100644
index 328205d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package storm.trident.operation;
-
-import java.util.Map;
-
-public abstract class BaseMultiReducer<T> implements MultiReducer<T> {
-
-    @Override
-    public void prepare(Map conf, TridentMultiReducerContext context) {
-    }
-
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java b/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java
deleted file mode 100644
index df6166d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.operation;
-
-import java.util.Map;
-
-public class BaseOperation implements Operation {
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-    }
-
-    @Override
-    public void cleanup() {
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java
deleted file mode 100644
index 03933c9..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import storm.trident.tuple.TridentTuple;
-
-// doesn't manipulate tuples (lists of stuff) so that things like aggregating into
-// cassandra is cleaner (don't need lists everywhere, just store the single value there)
-public interface CombinerAggregator<T> extends Serializable {
-    T init(TridentTuple tuple);
-    T combine(T val1, T val2);
-    T zero();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java b/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java
deleted file mode 100644
index b56fe96..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package storm.trident.operation;
-
-public interface EachOperation extends Operation {
-   
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Filter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Filter.java b/jstorm-client/src/main/java/storm/trident/operation/Filter.java
deleted file mode 100644
index ea7cbb6..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Filter.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.tuple.TridentTuple;
-
-
-public interface Filter extends EachOperation {
-    boolean isKeep(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Function.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Function.java b/jstorm-client/src/main/java/storm/trident/operation/Function.java
deleted file mode 100644
index b58a29d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Function.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.tuple.TridentTuple;
-
-public interface Function extends EachOperation {
-    void execute(TridentTuple tuple, TridentCollector collector);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java
deleted file mode 100644
index 9223cf7..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.tuple.TridentTuple;
-
-
-public interface GroupedMultiReducer<T> extends Serializable {
-    void prepare(Map conf, TridentMultiReducerContext context);
-    T init(TridentCollector collector, TridentTuple group);
-    void execute(T state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector);
-    void complete(T state, TridentTuple group, TridentCollector collector);
-    void cleanup();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java
deleted file mode 100644
index 520f4b9..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.tuple.TridentTuple;
-
-
-public interface MultiReducer<T> extends Serializable {
-    void prepare(Map conf, TridentMultiReducerContext context);
-    T init(TridentCollector collector);
-    void execute(T state, int streamIndex, TridentTuple input, TridentCollector collector);
-    void complete(T state, TridentCollector collector);
-    void cleanup();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Operation.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Operation.java b/jstorm-client/src/main/java/storm/trident/operation/Operation.java
deleted file mode 100644
index f67281e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Operation.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface Operation extends Serializable {
-    void prepare(Map conf, TridentOperationContext context);
-    void cleanup();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java
deleted file mode 100644
index 3b4efca..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import storm.trident.tuple.TridentTuple;
-
-public interface ReducerAggregator<T> extends Serializable {
-    T init();
-    T reduce(T curr, TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java b/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java
deleted file mode 100644
index b1a74d1..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import java.util.List;
-
-
-public interface TridentCollector {
-    void emit(List<Object> values);
-    void reportError(Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java b/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java
deleted file mode 100644
index fe0ff04..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package storm.trident.operation;
-
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class TridentMultiReducerContext {
-    List<TridentTuple.Factory> _factories;
-    
-    public TridentMultiReducerContext(List<TridentTuple.Factory> factories) {
-        _factories = factories;        
-    }
-    
-    public ProjectionFactory makeProjectionFactory(int streamIndex, Fields fields) {
-        return new ProjectionFactory(_factories.get(streamIndex), fields);
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java b/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java
deleted file mode 100644
index 3693125..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package storm.trident.operation;
-
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.task.IMetricsContext;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class TridentOperationContext implements IMetricsContext{
-    TridentTuple.Factory _factory;
-    TopologyContext _topoContext;
-    
-    public TridentOperationContext(TopologyContext topoContext, TridentTuple.Factory factory) {
-        _factory = factory;
-        _topoContext = topoContext;
-    }
-    
-    public TridentOperationContext(TridentOperationContext parent, TridentTuple.Factory factory) {
-        this(parent._topoContext, factory);
-    }    
-    
-    public ProjectionFactory makeProjectionFactory(Fields fields) {
-        return new ProjectionFactory(_factory, fields);
-    }
-    
-    public int numPartitions() {
-        return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
-    }
-    
-    public int getPartitionIndex() {
-        return _topoContext.getThisTaskIndex();
-    }
-
-    public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
-        return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
-    }
-    public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
-        return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
-    }
-    public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
-        return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java
deleted file mode 100644
index e40177e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.tuple.TridentTuple;
-
-
-public class Count implements CombinerAggregator<Long> {
-
-    @Override
-    public Long init(TridentTuple tuple) {
-        return 1L;
-    }
-
-    @Override
-    public Long combine(Long val1, Long val2) {
-        return val1 + val2;
-    }
-
-    @Override
-    public Long zero() {
-        return 0L;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java
deleted file mode 100644
index 34e905c..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-public class Debug extends BaseFilter {
-    private final String name;
-
-    public Debug() {
-        name = "DEBUG: ";
-    }
-
-    public Debug(String name) {
-        this.name = "DEBUG(" + name + "): ";
-    }
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        System.out.println(name + tuple.toString());
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java
deleted file mode 100644
index c53cfdd..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-
-public class Equals extends BaseFilter {
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        for(int i=0; i<tuple.size()-1; i++) {
-            Object o1 = tuple.getValue(i);
-            Object o2 = tuple.getValue(i+1);
-            if (o1 == null) {
-            	if (o2 != null) {
-            		return false;
-            	}
-            }else if (o1.equals(o2) == false){
-            	return false;
-            }
-
-        }
-        return true;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java
deleted file mode 100644
index bed2f1e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-public class FilterNull extends BaseFilter {
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        for(Object o: tuple) {
-            if(o==null) return false;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java
deleted file mode 100644
index 412badd..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package storm.trident.operation.builtin;
-
-import backtype.storm.tuple.Fields;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import storm.trident.Stream;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.Assembly;
-import storm.trident.operation.BaseAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-
-public class FirstN implements Assembly {
-
-    Aggregator _agg;
-    
-    public FirstN(int n, String sortField) {
-        this(n, sortField, false);
-    }
-    
-    public FirstN(int n, String sortField, boolean reverse) {
-        if(sortField!=null) {
-            _agg = new FirstNSortedAgg(n, sortField, reverse);
-        } else {
-            _agg = new FirstNAgg(n);
-        }
-    }
-    
-    @Override
-    public Stream apply(Stream input) {
-        Fields outputFields = input.getOutputFields();
-        return input.partitionAggregate(outputFields, _agg, outputFields)
-                    .global()
-                    .partitionAggregate(outputFields, _agg, outputFields);             
-    }
-    
-    public static class FirstNAgg extends BaseAggregator<FirstNAgg.State> {
-        int _n;
-        
-        public FirstNAgg(int n) {
-            _n = n;
-        }
-        
-        static class State {
-            int emitted = 0;
-        }
-        
-        @Override
-        public State init(Object batchId, TridentCollector collector) {
-            return new State();
-        }
-
-        @Override
-        public void aggregate(State val, TridentTuple tuple, TridentCollector collector) {
-            if(val.emitted < _n) {
-                collector.emit(tuple);
-                val.emitted++;
-            }
-        }
-
-        @Override
-        public void complete(State val, TridentCollector collector) {
-        }
-        
-    }
-    
-    public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {
-
-        int _n;
-        String _sortField;
-        boolean _reverse;
-        
-        public FirstNSortedAgg(int n, String sortField, boolean reverse) {
-            _n = n;
-            _sortField = sortField;
-            _reverse = reverse;
-        }
-
-        @Override
-        public PriorityQueue init(Object batchId, TridentCollector collector) {
-            return new PriorityQueue(_n, new Comparator<TridentTuple>() {
-                @Override
-                public int compare(TridentTuple t1, TridentTuple t2) {
-                    Comparable c1 = (Comparable) t1.getValueByField(_sortField);
-                    Comparable c2 = (Comparable) t2.getValueByField(_sortField);
-                    int ret = c1.compareTo(c2);
-                    if(_reverse) ret *= -1;
-                    return ret;
-                }                
-            });
-        }
-
-        @Override
-        public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
-            state.add(tuple);
-        }
-
-        @Override
-        public void complete(PriorityQueue val, TridentCollector collector) {
-            int total = val.size();
-            for(int i=0; i<_n && i < total; i++) {
-                TridentTuple t = (TridentTuple) val.remove();
-                collector.emit(t);
-            }
-        }
-    }    
-}