You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:00 UTC
[21/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/TridentTopology.java b/jstorm-client/src/main/java/storm/trident/TridentTopology.java
deleted file mode 100644
index 7b4b00d..0000000
--- a/jstorm-client/src/main/java/storm/trident/TridentTopology.java
+++ /dev/null
@@ -1,796 +0,0 @@
-package storm.trident;
-
-import backtype.storm.Config;
-import backtype.storm.ILocalDRPC;
-import backtype.storm.drpc.DRPCSpout;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import org.jgrapht.DirectedGraph;
-import org.jgrapht.UndirectedGraph;
-import org.jgrapht.alg.ConnectivityInspector;
-import org.jgrapht.graph.DefaultDirectedGraph;
-import org.jgrapht.graph.Pseudograph;
-import storm.trident.drpc.ReturnResultsReducer;
-import storm.trident.fluent.GroupedStream;
-import storm.trident.fluent.IAggregatableStream;
-import storm.trident.fluent.UniqueIdGen;
-import storm.trident.graph.GraphGrouper;
-import storm.trident.graph.Group;
-import storm.trident.operation.GroupedMultiReducer;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.impl.FilterExecutor;
-import storm.trident.operation.impl.GroupedMultiReducerExecutor;
-import storm.trident.operation.impl.IdentityMultiReducer;
-import storm.trident.operation.impl.JoinerMultiReducer;
-import storm.trident.operation.impl.TrueFilter;
-import storm.trident.partition.IdentityGrouping;
-import storm.trident.planner.Node;
-import storm.trident.planner.NodeStateInfo;
-import storm.trident.planner.PartitionNode;
-import storm.trident.planner.ProcessorNode;
-import storm.trident.planner.SpoutNode;
-import storm.trident.planner.SubtopologyBolt;
-import storm.trident.planner.processor.EachProcessor;
-import storm.trident.planner.processor.MultiReducerProcessor;
-import storm.trident.spout.BatchSpoutExecutor;
-import storm.trident.spout.IBatchSpout;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-import storm.trident.spout.IPartitionedTridentSpout;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor;
-import storm.trident.spout.PartitionedTridentSpoutExecutor;
-import storm.trident.spout.RichSpoutBatchExecutor;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateSpec;
-import storm.trident.topology.TridentTopologyBuilder;
-import storm.trident.util.ErrorEdgeFactory;
-import storm.trident.util.IndexedEdge;
-import storm.trident.util.TridentUtils;
-
-
-// graph with 3 kinds of nodes:
-// operation, partition, or spout
-// all operations have finishBatch and can optionally be committers
-public class TridentTopology {
-
- //TODO: add a method for drpc stream, needs to know how to automatically do returnresults, etc
- // is it too expensive to do a batch per drpc request?
-
- DefaultDirectedGraph<Node, IndexedEdge> _graph;
- Map<String, List<Node>> _colocate = new HashMap();
- UniqueIdGen _gen;
-
- public TridentTopology() {
- _graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
- _gen = new UniqueIdGen();
- }
-
- private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
- _graph = graph;
- _colocate = colocate;
- _gen = gen;
- }
-
-
- // automatically turn it into a batch spout, should take parameters as to how much to batch
-// public Stream newStream(IRichSpout spout) {
-// Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.BATCH);
-// return addNode(n);
-// }
-
- public Stream newStream(String txId, IRichSpout spout) {
- return newStream(txId, new RichSpoutBatchExecutor(spout));
- }
-
- public Stream newStream(String txId, IBatchSpout spout) {
- Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
- return addNode(n);
- }
-
- public Stream newStream(String txId, ITridentSpout spout) {
- Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
- return addNode(n);
- }
-
- public Stream newStream(String txId, IPartitionedTridentSpout spout) {
- return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
- }
-
- public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
- return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
- }
-
- public Stream newDRPCStream(String function) {
- return newDRPCStream(new DRPCSpout(function));
- }
-
- public Stream newDRPCStream(String function, ILocalDRPC server) {
- DRPCSpout spout;
- if(server==null) {
- spout = new DRPCSpout(function);
- } else {
- spout = new DRPCSpout(function, server);
- }
- return newDRPCStream(spout);
- }
-
- private Stream newDRPCStream(DRPCSpout spout) {
- // TODO: consider adding a shuffle grouping after the spout to avoid so much routing of the args/return-info all over the place
- // (at least until its possible to just pack bolt logic into the spout itself)
-
- Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.DRPC);
- Stream nextStream = addNode(n);
- // later on, this will be joined back with return-info and all the results
- return nextStream.project(new Fields("args"));
- }
-
- public TridentState newStaticState(StateFactory factory) {
- return newStaticState(new StateSpec(factory));
- }
-
- public TridentState newStaticState(StateSpec spec) {
- String stateId = getUniqueStateId();
- Node n = new Node(getUniqueStreamId(), null, new Fields());
- n.stateInfo = new NodeStateInfo(stateId, spec);
- registerNode(n);
- return new TridentState(this, n);
- }
-
- public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
- return multiReduce(Arrays.asList(s1, s2), function, outputFields);
- }
-
- public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
- return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
- }
-
- public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
- return multiReduce(Arrays.asList(s1, s2), function, outputFields);
- }
-
- public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
- return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
- }
-
- public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
- return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
- }
-
- public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
- return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
- }
-
- public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
- List<String> names = new ArrayList<String>();
- for(Stream s: streams) {
- if(s._name!=null) {
- names.add(s._name);
- }
- }
- Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
- return addSourcedNode(streams, n);
- }
-
- public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
- List<Fields> fullInputFields = new ArrayList<Fields>();
- List<Stream> streams = new ArrayList<Stream>();
- List<Fields> fullGroupFields = new ArrayList<Fields>();
- for(int i=0; i<groupedStreams.size(); i++) {
- GroupedStream gs = groupedStreams.get(i);
- Fields groupFields = gs.getGroupFields();
- fullGroupFields.add(groupFields);
- streams.add(gs.toStream().partitionBy(groupFields));
- fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
-
- }
- return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
- }
-
- public Stream merge(Fields outputFields, Stream... streams) {
- return merge(outputFields, Arrays.asList(streams));
- }
-
- public Stream merge(Fields outputFields, List<Stream> streams) {
- return multiReduce(streams, new IdentityMultiReducer(), outputFields);
- }
-
- public Stream merge(Stream... streams) {
- return merge(Arrays.asList(streams));
- }
-
- public Stream merge(List<Stream> streams) {
- return merge(streams.get(0).getOutputFields(), streams);
- }
-
- public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
- return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);
- }
-
- public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
- return join(streams, joinFields, outFields, JoinType.INNER);
- }
-
- public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
- return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);
- }
-
- public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
- return join(streams, joinFields, outFields, repeat(streams.size(), type));
- }
-
- public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
- return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
-
- }
-
- public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
- return multiReduce(strippedInputFields(streams, joinFields),
- groupedStreams(streams, joinFields),
- new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
- outFields);
- }
-
- public StormTopology build() {
- DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
-
-
- completeDRPC(graph, _colocate, _gen);
-
- List<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
-
- // can be regular nodes (static state) or processor nodes
- Set<Node> boltNodes = new HashSet<Node>();
- for(Node n: graph.vertexSet()) {
- if(n instanceof SpoutNode) {
- spoutNodes.add((SpoutNode) n);
- } else if(!(n instanceof PartitionNode)) {
- boltNodes.add(n);
- }
- }
-
-
- Set<Group> initialGroups = new HashSet<Group>();
- for(List<Node> colocate: _colocate.values()) {
- Group g = new Group(graph, colocate);
- boltNodes.removeAll(colocate);
- initialGroups.add(g);
- }
- for(Node n: boltNodes) {
- initialGroups.add(new Group(graph, n));
- }
-
-
- GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
- grouper.mergeFully();
- Collection<Group> mergedGroups = grouper.getAllGroups();
-
-
-
- // add identity partitions between groups
- for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
- if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {
- Group g1 = grouper.nodeGroup(e.source);
- Group g2 = grouper.nodeGroup(e.target);
- // g1 being null means the source is a spout node
- if(g1==null && !(e.source instanceof SpoutNode))
- throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
- if(g1==null || !g1.equals(g2)) {
- graph.removeEdge(e);
- PartitionNode pNode = makeIdentityPartition(e.source);
- graph.addVertex(pNode);
- graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
- graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));
- }
- }
- }
- // if one group subscribes to the same stream with same partitioning multiple times,
- // merge those together (otherwise can end up with many output streams created for that partitioning
- // if need to split into multiple output streams because of same input having different
- // partitioning to the group)
-
- // this is because can't currently merge splitting logic into a spout
- // not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it
- // works out
- List<Node> forNewGroups = new ArrayList<Node>();
- for(Group g: mergedGroups) {
- for(PartitionNode n: extraPartitionInputs(g)) {
- Node idNode = makeIdentityNode(n.allOutputFields);
- Node newPartitionNode = new PartitionNode(idNode.streamId, n.name, idNode.allOutputFields, n.thriftGrouping);
- Node parentNode = TridentUtils.getParent(graph, n);
- Set<IndexedEdge> outgoing = graph.outgoingEdgesOf(n);
- graph.removeVertex(n);
-
- graph.addVertex(idNode);
- graph.addVertex(newPartitionNode);
- addEdge(graph, parentNode, idNode, 0);
- addEdge(graph, idNode, newPartitionNode, 0);
- for(IndexedEdge e: outgoing) {
- addEdge(graph, newPartitionNode, e.target, e.index);
- }
- Group parentGroup = grouper.nodeGroup(parentNode);
- if(parentGroup==null) {
- forNewGroups.add(idNode);
- } else {
- parentGroup.nodes.add(idNode);
- }
- }
- }
- // TODO: in the future, want a way to include this logic in the spout itself,
- // or make it unecessary by having storm include metadata about which grouping a tuple
- // came from
-
- for(Node n: forNewGroups) {
- grouper.addGroup(new Group(graph, n));
- }
-
- // add in spouts as groups so we can get parallelisms
- for(Node n: spoutNodes) {
- grouper.addGroup(new Group(graph, n));
- }
-
- grouper.reindex();
- mergedGroups = grouper.getAllGroups();
-
-
- Map<Node, String> batchGroupMap = new HashMap();
- List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
- for(int i=0; i<connectedComponents.size(); i++) {
- String groupId = "bg" + i;
- for(Node n: connectedComponents.get(i)) {
- batchGroupMap.put(n, groupId);
- }
- }
-
-// System.out.println("GRAPH:");
-// System.out.println(graph);
-
- Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
-
- TridentTopologyBuilder builder = new TridentTopologyBuilder();
-
- Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
- Map<Group, String> boltIds = genBoltIds(mergedGroups);
-
- for(SpoutNode sn: spoutNodes) {
- Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
- if(sn.type == SpoutNode.SpoutType.DRPC) {
- builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
- (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
- } else {
- ITridentSpout s;
- if(sn.spout instanceof IBatchSpout) {
- s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
- } else if(sn.spout instanceof ITridentSpout) {
- s = (ITridentSpout) sn.spout;
- } else {
- throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
- // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
- }
- builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
- }
- }
-
- for(Group g: mergedGroups) {
- if(!isSpoutGroup(g)) {
- Integer p = parallelisms.get(g);
- Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
- BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
- committerBatches(g, batchGroupMap), streamToGroup);
- Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
- for(PartitionNode n: inputs) {
- Node parent = TridentUtils.getParent(graph, n);
- String componentId;
- if(parent instanceof SpoutNode) {
- componentId = spoutIds.get(parent);
- } else {
- componentId = boltIds.get(grouper.nodeGroup(parent));
- }
- d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
- }
- }
- }
-
- return builder.buildTopology();
- }
-
- private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
- List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
-
- for(Set<Node> g: connectedComponents) {
- checkValidJoins(g);
- }
-
- TridentTopology helper = new TridentTopology(graph, colocate, gen);
- for(Set<Node> g: connectedComponents) {
- SpoutNode drpcNode = getDRPCSpoutNode(g);
- if(drpcNode!=null) {
- Stream lastStream = new Stream(helper, null, getLastAddedNode(g));
- Stream s = new Stream(helper, null, drpcNode);
- helper.multiReduce(
- s.project(new Fields("return-info"))
- .batchGlobal(),
- lastStream.batchGlobal(),
- new ReturnResultsReducer(),
- new Fields());
- }
- }
- }
-
- private static Node getLastAddedNode(Collection<Node> g) {
- Node ret = null;
- for(Node n: g) {
- if(ret==null || n.creationIndex > ret.creationIndex) {
- ret = n;
- }
- }
- return ret;
- }
-
- //returns null if it's not a drpc group
- private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
- for(Node n: g) {
- if(n instanceof SpoutNode) {
- SpoutNode.SpoutType type = ((SpoutNode) n).type;
- if(type==SpoutNode.SpoutType.DRPC) {
- return (SpoutNode) n;
- }
- }
- }
- return null;
- }
-
- private static void checkValidJoins(Collection<Node> g) {
- boolean hasDRPCSpout = false;
- boolean hasBatchSpout = false;
- for(Node n: g) {
- if(n instanceof SpoutNode) {
- SpoutNode.SpoutType type = ((SpoutNode) n).type;
- if(type==SpoutNode.SpoutType.BATCH) {
- hasBatchSpout = true;
- } else if(type==SpoutNode.SpoutType.DRPC) {
- hasDRPCSpout = true;
- }
- }
- }
- if(hasBatchSpout && hasDRPCSpout) {
- throw new RuntimeException("Cannot join DRPC stream with streams originating from other spouts");
- }
- }
-
- private static boolean isSpoutGroup(Group g) {
- return g.nodes.size() == 1 && g.nodes.iterator().next() instanceof SpoutNode;
- }
-
- private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
- Map<String, PartitionNode> ret = new HashMap();
- for(PartitionNode n: subscriptions) {
- PartitionNode curr = ret.get(n.streamId);
- if(curr!=null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
- throw new RuntimeException("Multiple subscriptions to the same stream with different groupings. Should be impossible since that is explicitly guarded against.");
- }
- ret.put(n.streamId, n);
- }
- return ret.values();
- }
-
- private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
- Map<Node, String> ret = new HashMap();
- int ctr = 0;
- for(SpoutNode n: spoutNodes) {
- ret.put(n, "spout" + ctr);
- ctr++;
- }
- return ret;
- }
-
- private static Map<Group, String> genBoltIds(Collection<Group> groups) {
- Map<Group, String> ret = new HashMap();
- int ctr = 0;
- for(Group g: groups) {
- if(!isSpoutGroup(g)) {
- List<String> name = new ArrayList();
- name.add("b");
- name.add("" + ctr);
- String groupName = getGroupName(g);
- if(groupName!=null && !groupName.isEmpty()) {
- name.add(getGroupName(g));
- }
- ret.put(g, Utils.join(name, "-"));
- ctr++;
- }
- }
- return ret;
- }
-
- private static String getGroupName(Group g) {
- TreeMap<Integer, String> sortedNames = new TreeMap();
- for(Node n: g.nodes) {
- if(n.name!=null) {
- sortedNames.put(n.creationIndex, n.name);
- }
- }
- List<String> names = new ArrayList<String>();
- String prevName = null;
- for(String n: sortedNames.values()) {
- if(prevName==null || !n.equals(prevName)) {
- prevName = n;
- names.add(n);
- }
- }
- return Utils.join(names, "-");
- }
-
- private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
- Map<String, String> ret = new HashMap();
- Set<PartitionNode> externalGroupOutputs = externalGroupOutputs(g);
- for(PartitionNode n: externalGroupOutputs) {
- ret.put(n.streamId, batchGroupMap.get(n));
- }
- return ret;
- }
-
- private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
- Set<String> ret = new HashSet();
- for(Node n: g.nodes) {
- if(n instanceof ProcessorNode) {
- if(((ProcessorNode) n).committer) {
- ret.add(batchGroupMap.get(n));
- }
- }
- }
- return ret;
- }
-
- private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
- UndirectedGraph<Group, Object> equivs = new Pseudograph<Group, Object>(Object.class);
- for(Group g: groups) {
- equivs.addVertex(g);
- }
- for(Group g: groups) {
- for(PartitionNode n: externalGroupInputs(g)) {
- if(isIdentityPartition(n)) {
- Node parent = TridentUtils.getParent(graph, n);
- Group parentGroup = grouper.nodeGroup(parent);
- if(parentGroup!=null && !parentGroup.equals(g)) {
- equivs.addEdge(parentGroup, g);
- }
- }
- }
- }
-
- Map<Group, Integer> ret = new HashMap();
- List<Set<Group>> equivGroups = new ConnectivityInspector<Group, Object>(equivs).connectedSets();
- for(Set<Group> equivGroup: equivGroups) {
- Integer fixedP = getFixedParallelism(equivGroup);
- Integer maxP = getMaxParallelism(equivGroup);
- if(fixedP!=null && maxP!=null && maxP < fixedP) {
- throw new RuntimeException("Parallelism is fixed to " + fixedP + " but max parallelism is less than that: " + maxP);
- }
-
-
- Integer p = 1;
- for(Group g: equivGroup) {
- for(Node n: g.nodes) {
- if(n.parallelismHint!=null) {
- p = Math.max(p, n.parallelismHint);
- }
- }
- }
- if(maxP!=null) p = Math.min(maxP, p);
-
- if(fixedP!=null) p = fixedP;
- for(Group g: equivGroup) {
- ret.put(g, p);
- }
- }
- return ret;
- }
-
- private static Integer getMaxParallelism(Set<Group> groups) {
- Integer ret = null;
- for(Group g: groups) {
- if(isSpoutGroup(g)) {
- SpoutNode n = (SpoutNode) g.nodes.iterator().next();
- Map conf = getSpoutComponentConfig(n.spout);
- if(conf==null) conf = new HashMap();
- Number maxP = (Number) conf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
- if(maxP!=null) {
- if(ret==null) ret = maxP.intValue();
- else ret = Math.min(ret, maxP.intValue());
- }
- }
- }
- return ret;
- }
-
- private static Map getSpoutComponentConfig(Object spout) {
- if(spout instanceof IRichSpout) {
- return ((IRichSpout) spout).getComponentConfiguration();
- } else if (spout instanceof IBatchSpout) {
- return ((IBatchSpout) spout).getComponentConfiguration();
- } else {
- return ((ITridentSpout) spout).getComponentConfiguration();
- }
- }
-
- private static Integer getFixedParallelism(Set<Group> groups) {
- Integer ret = null;
- for(Group g: groups) {
- for(Node n: g.nodes) {
- if(n.stateInfo != null && n.stateInfo.spec.requiredNumPartitions!=null) {
- int reqPartitions = n.stateInfo.spec.requiredNumPartitions;
- if(ret!=null && ret!=reqPartitions) {
- throw new RuntimeException("Cannot have one group have fixed parallelism of two different values");
- }
- ret = reqPartitions;
- }
- }
- }
- return ret;
- }
-
- private static boolean isIdentityPartition(PartitionNode n) {
- Grouping g = n.thriftGrouping;
- if(g.is_set_custom_serialized()) {
- CustomStreamGrouping csg = (CustomStreamGrouping) Utils.deserialize(g.get_custom_serialized());
- return csg instanceof IdentityGrouping;
- }
- return false;
- }
-
- private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
- g.addEdge(source, target, new IndexedEdge(source, target, index));
- }
-
- private Node makeIdentityNode(Fields allOutputFields) {
- return new ProcessorNode(getUniqueStreamId(), null, allOutputFields, new Fields(),
- new EachProcessor(new Fields(), new FilterExecutor(new TrueFilter())));
- }
-
- private static List<PartitionNode> extraPartitionInputs(Group g) {
- List<PartitionNode> ret = new ArrayList();
- Set<PartitionNode> inputs = externalGroupInputs(g);
- Map<String, List<PartitionNode>> grouped = new HashMap();
- for(PartitionNode n: inputs) {
- if(!grouped.containsKey(n.streamId)) {
- grouped.put(n.streamId, new ArrayList());
- }
- grouped.get(n.streamId).add(n);
- }
- for(List<PartitionNode> group: grouped.values()) {
- PartitionNode anchor = group.get(0);
- for(int i=1; i<group.size(); i++) {
- PartitionNode n = group.get(i);
- if(!n.thriftGrouping.equals(anchor.thriftGrouping)) {
- ret.add(n);
- }
- }
- }
- return ret;
- }
-
- private static Set<PartitionNode> externalGroupInputs(Group g) {
- Set<PartitionNode> ret = new HashSet();
- for(Node n: g.incomingNodes()) {
- if(n instanceof PartitionNode) {
- ret.add((PartitionNode) n);
- }
- }
- return ret;
- }
-
- private static Set<PartitionNode> externalGroupOutputs(Group g) {
- Set<PartitionNode> ret = new HashSet();
- for(Node n: g.outgoingNodes()) {
- if(n instanceof PartitionNode) {
- ret.add((PartitionNode) n);
- }
- }
- return ret;
- }
-
- private static PartitionNode makeIdentityPartition(Node basis) {
- return new PartitionNode(basis.streamId, basis.name, basis.allOutputFields,
- Grouping.custom_serialized(Utils.serialize(new IdentityGrouping())));
- }
-
-
- protected String getUniqueStreamId() {
- return _gen.getUniqueStreamId();
- }
-
- protected String getUniqueStateId() {
- return _gen.getUniqueStateId();
- }
-
- protected void registerNode(Node n) {
- _graph.addVertex(n);
- if(n.stateInfo!=null) {
- String id = n.stateInfo.id;
- if(!_colocate.containsKey(id)) {
- _colocate.put(id, new ArrayList());
- }
- _colocate.get(id).add(n);
- }
- }
-
- protected Stream addNode(Node n) {
- registerNode(n);
- return new Stream(this, n.name, n);
- }
-
- protected void registerSourcedNode(List<Stream> sources, Node newNode) {
- registerNode(newNode);
- int streamIndex = 0;
- for(Stream s: sources) {
- _graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
- streamIndex++;
- }
- }
-
- protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
- registerSourcedNode(sources, newNode);
- return new Stream(this, newNode.name, newNode);
- }
-
- protected TridentState addSourcedStateNode(List<Stream> sources, Node newNode) {
- registerSourcedNode(sources, newNode);
- return new TridentState(this, newNode);
- }
-
- protected Stream addSourcedNode(Stream source, Node newNode) {
- return addSourcedNode(Arrays.asList(source), newNode);
- }
-
- protected TridentState addSourcedStateNode(Stream source, Node newNode) {
- return addSourcedStateNode(Arrays.asList(source), newNode);
- }
-
- private static List<Fields> getAllOutputFields(List streams) {
- List<Fields> ret = new ArrayList<Fields>();
- for(Object o: streams) {
- ret.add(((IAggregatableStream) o).getOutputFields());
- }
- return ret;
- }
-
-
- private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
- List<GroupedStream> ret = new ArrayList<GroupedStream>();
- for(int i=0; i<streams.size(); i++) {
- ret.add(streams.get(i).groupBy(joinFields.get(i)));
- }
- return ret;
- }
-
- private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
- List<Fields> ret = new ArrayList<Fields>();
- for(int i=0; i<streams.size(); i++) {
- ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
- }
- return ret;
- }
-
- private static List<JoinType> repeat(int n, JoinType type) {
- List<JoinType> ret = new ArrayList<JoinType>();
- for(int i=0; i<n; i++) {
- ret.add(type);
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java b/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java
deleted file mode 100644
index e89719e..0000000
--- a/jstorm-client/src/main/java/storm/trident/drpc/ReturnResultsReducer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package storm.trident.drpc;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.thrift7.TException;
-
-import storm.trident.drpc.ReturnResultsReducer.ReturnResultsState;
-import storm.trident.operation.MultiReducer;
-import storm.trident.operation.TridentCollector;
-import storm.trident.operation.TridentMultiReducerContext;
-import storm.trident.tuple.TridentTuple;
-import backtype.storm.Config;
-import backtype.storm.drpc.DRPCInvocationsClient;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.utils.ServiceRegistry;
-import backtype.storm.utils.Utils;
-
-
-public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
- public static class ReturnResultsState {
- List<TridentTuple> results = new ArrayList<TridentTuple>();
- String returnInfo;
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
- }
- }
- boolean local;
-
- Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
-
-
- @Override
- public void prepare(Map conf, TridentMultiReducerContext context) {
- local = conf.get(Config.STORM_CLUSTER_MODE).equals("local");
- }
-
- @Override
- public ReturnResultsState init(TridentCollector collector) {
- return new ReturnResultsState();
- }
-
- @Override
- public void execute(ReturnResultsState state, int streamIndex, TridentTuple input, TridentCollector collector) {
- if(streamIndex==0) {
- state.returnInfo = input.getString(0);
- } else {
- state.results.add(input);
- }
- }
-
- @Override
- public void complete(ReturnResultsState state, TridentCollector collector) {
- // only one of the multireducers will receive the tuples
- if(state.returnInfo!=null) {
- String result = Utils.to_json(state.results);
- Map retMap = (Map) Utils.from_json(state.returnInfo);
- final String host = (String) retMap.get("host");
- final int port = Utils.getInt(retMap.get("port"));
- String id = (String) retMap.get("id");
- DistributedRPCInvocations.Iface client;
- if(local) {
- client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
- } else {
- List server = new ArrayList() {{
- add(host);
- add(port);
- }};
-
- if(!_clients.containsKey(server)) {
- _clients.put(server, new DRPCInvocationsClient(host, port));
- }
- client = _clients.get(server);
- }
-
- try {
- client.result(id, result);
- } catch(TException e) {
- collector.reportError(e);
- }
- }
- }
-
- @Override
- public void cleanup() {
- for(DRPCInvocationsClient c: _clients.values()) {
- c.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java
deleted file mode 100644
index de8fe9c..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/ChainedAggregatorDeclarer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import storm.trident.Stream;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.impl.ChainedAggregatorImpl;
-import storm.trident.operation.impl.CombinerAggregatorCombineImpl;
-import storm.trident.operation.impl.CombinerAggregatorInitImpl;
-import storm.trident.operation.impl.ReducerAggregatorImpl;
-import storm.trident.operation.impl.SingleEmitAggregator;
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-import storm.trident.tuple.ComboList;
-
-
-public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer, ChainedPartitionAggregatorDeclarer {
- public static interface AggregationPartition {
- Stream partition(Stream input);
- }
-
- private static enum AggType {
- PARTITION,
- FULL,
- FULL_COMBINE
- }
-
- // inputFields can be equal to outFields, but multiple aggregators cannot have intersection outFields
- private static class AggSpec {
- Fields inFields;
- Aggregator agg;
- Fields outFields;
-
- public AggSpec(Fields inFields, Aggregator agg, Fields outFields) {
- this.inFields = inFields;
- this.agg = agg;
- this.outFields = outFields;
- }
- }
-
- List<AggSpec> _aggs = new ArrayList<AggSpec>();
- IAggregatableStream _stream;
- AggType _type = null;
- GlobalAggregationScheme _globalScheme;
-
- public ChainedAggregatorDeclarer(IAggregatableStream stream, GlobalAggregationScheme globalScheme) {
- _stream = stream;
- _globalScheme = globalScheme;
- }
-
- public Stream chainEnd() {
- Fields[] inputFields = new Fields[_aggs.size()];
- Aggregator[] aggs = new Aggregator[_aggs.size()];
- int[] outSizes = new int[_aggs.size()];
- List<String> allOutFields = new ArrayList<String>();
- Set<String> allInFields = new HashSet<String>();
- for(int i=0; i<_aggs.size(); i++) {
- AggSpec spec = _aggs.get(i);
- Fields infields = spec.inFields;
- if(infields==null) infields = new Fields();
- Fields outfields = spec.outFields;
- if(outfields==null) outfields = new Fields();
-
- inputFields[i] = infields;
- aggs[i] = spec.agg;
- outSizes[i] = outfields.size();
- allOutFields.addAll(outfields.toList());
- allInFields.addAll(infields.toList());
- }
- if(new HashSet(allOutFields).size() != allOutFields.size()) {
- throw new IllegalArgumentException("Output fields for chained aggregators must be distinct: " + allOutFields.toString());
- }
-
- Fields inFields = new Fields(new ArrayList<String>(allInFields));
- Fields outFields = new Fields(allOutFields);
- Aggregator combined = new ChainedAggregatorImpl(aggs, inputFields, new ComboList.Factory(outSizes));
-
- if(_type!=AggType.FULL) {
- _stream = _stream.partitionAggregate(inFields, combined, outFields);
- }
- if(_type!=AggType.PARTITION) {
- _stream = _globalScheme.aggPartition(_stream);
- BatchToPartition singleEmit = _globalScheme.singleEmitPartitioner();
- Aggregator toAgg = combined;
- if(singleEmit!=null) {
- toAgg = new SingleEmitAggregator(combined, singleEmit);
- }
- // this assumes that inFields and outFields are the same for combineragg
- // assumption also made above
- _stream = _stream.partitionAggregate(inFields, toAgg, outFields);
- }
- return _stream.toStream();
- }
-
- public ChainedPartitionAggregatorDeclarer partitionAggregate(Aggregator agg, Fields functionFields) {
- return partitionAggregate(null, agg, functionFields);
- }
-
- public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
- _type = AggType.PARTITION;
- _aggs.add(new AggSpec(inputFields, agg, functionFields));
- return this;
- }
-
- public ChainedPartitionAggregatorDeclarer partitionAggregate(CombinerAggregator agg, Fields functionFields) {
- return partitionAggregate(null, agg, functionFields);
- }
-
- public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
- initCombiner(inputFields, agg, functionFields);
- return partitionAggregate(functionFields, new CombinerAggregatorCombineImpl(agg), functionFields);
- }
-
- public ChainedPartitionAggregatorDeclarer partitionAggregate(ReducerAggregator agg, Fields functionFields) {
- return partitionAggregate(null, agg, functionFields);
- }
-
- public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
- return partitionAggregate(inputFields, new ReducerAggregatorImpl(agg), functionFields);
- }
-
- public ChainedFullAggregatorDeclarer aggregate(Aggregator agg, Fields functionFields) {
- return aggregate(null, agg, functionFields);
- }
-
- public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
- return aggregate(inputFields, agg, functionFields, false);
- }
-
- private ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields, boolean isCombiner) {
- if(isCombiner) {
- if(_type == null) {
- _type = AggType.FULL_COMBINE;
- }
- } else {
- _type = AggType.FULL;
- }
- _aggs.add(new AggSpec(inputFields, agg, functionFields));
- return this;
- }
-
- public ChainedFullAggregatorDeclarer aggregate(CombinerAggregator agg, Fields functionFields) {
- return aggregate(null, agg, functionFields);
- }
-
- public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
- initCombiner(inputFields, agg, functionFields);
- return aggregate(functionFields, new CombinerAggregatorCombineImpl(agg), functionFields, true);
- }
-
- public ChainedFullAggregatorDeclarer aggregate(ReducerAggregator agg, Fields functionFields) {
- return aggregate(null, agg, functionFields);
- }
-
- public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
- return aggregate(inputFields, new ReducerAggregatorImpl(agg), functionFields);
- }
-
- private void initCombiner(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
- _stream = _stream.each(inputFields, new CombinerAggregatorInitImpl(agg), functionFields);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java
deleted file mode 100644
index 84436a6..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/ChainedFullAggregatorDeclarer.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.ReducerAggregator;
-
-public interface ChainedFullAggregatorDeclarer extends IChainedAggregatorDeclarer {
- ChainedFullAggregatorDeclarer aggregate(Aggregator agg, Fields functionFields);
- ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields);
- ChainedFullAggregatorDeclarer aggregate(CombinerAggregator agg, Fields functionFields);
- ChainedFullAggregatorDeclarer aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields);
- ChainedFullAggregatorDeclarer aggregate(ReducerAggregator agg, Fields functionFields);
- ChainedFullAggregatorDeclarer aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java
deleted file mode 100644
index 00e2c5a..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/ChainedPartitionAggregatorDeclarer.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.ReducerAggregator;
-
-public interface ChainedPartitionAggregatorDeclarer extends IChainedAggregatorDeclarer {
- ChainedPartitionAggregatorDeclarer partitionAggregate(Aggregator agg, Fields functionFields);
- ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields);
- ChainedPartitionAggregatorDeclarer partitionAggregate(CombinerAggregator agg, Fields functionFields);
- ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields);
- ChainedPartitionAggregatorDeclarer partitionAggregate(ReducerAggregator agg, Fields functionFields);
- ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java b/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java
deleted file mode 100644
index 96f15e9..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/GlobalAggregationScheme.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.fluent;
-
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-
-
-public interface GlobalAggregationScheme<S extends IAggregatableStream> {
- IAggregatableStream aggPartition(S stream); // how to partition for second stage of aggregation
- BatchToPartition singleEmitPartitioner(); // return null if it's not single emit
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java b/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java
deleted file mode 100644
index ad1e121..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/GroupedStream.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.operation.Function;
-import storm.trident.operation.ReducerAggregator;
-import storm.trident.operation.impl.GroupedAggregator;
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-import storm.trident.state.QueryFunction;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateSpec;
-import storm.trident.state.map.MapCombinerAggStateUpdater;
-import storm.trident.state.map.MapReducerAggStateUpdater;
-import storm.trident.util.TridentUtils;
-
-
-public class GroupedStream implements IAggregatableStream, GlobalAggregationScheme<GroupedStream> {
- Fields _groupFields;
- Stream _stream;
-
- public GroupedStream(Stream stream, Fields groupFields) {
- _groupFields = groupFields;
- _stream = stream;
- }
-
- public GroupedStream name(String name) {
- return new GroupedStream(_stream.name(name), _groupFields);
- }
-
- public ChainedAggregatorDeclarer chainedAgg() {
- return new ChainedAggregatorDeclarer(this, this);
- }
-
- public Stream aggregate(Aggregator agg, Fields functionFields) {
- return aggregate(null, agg, functionFields);
- }
-
- public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
- return new ChainedAggregatorDeclarer(this, this)
- .aggregate(inputFields, agg, functionFields)
- .chainEnd();
- }
-
- public Stream aggregate(CombinerAggregator agg, Fields functionFields) {
- return aggregate(null, agg, functionFields);
- }
-
- public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
- return new ChainedAggregatorDeclarer(this, this)
- .aggregate(inputFields, agg, functionFields)
- .chainEnd();
- }
-
- public Stream aggregate(ReducerAggregator agg, Fields functionFields) {
- return aggregate(null, agg, functionFields);
- }
-
- public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
- return new ChainedAggregatorDeclarer(this, this)
- .aggregate(inputFields, agg, functionFields)
- .chainEnd();
- }
-
- public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
- return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
- }
-
- public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
- return persistentAggregate(spec, null, agg, functionFields);
- }
-
- public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
- return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
- }
-
- public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
- return aggregate(inputFields, agg, functionFields)
- .partitionPersist(spec,
- TridentUtils.fieldsUnion(_groupFields, functionFields),
- new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),
- TridentUtils.fieldsConcat(_groupFields, functionFields));
- }
-
- public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
- return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
- }
-
- public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
- return _stream.partitionBy(_groupFields)
- .partitionPersist(spec,
- TridentUtils.fieldsUnion(_groupFields, inputFields),
- new MapReducerAggStateUpdater(agg, _groupFields, inputFields),
- TridentUtils.fieldsConcat(_groupFields, functionFields));
- }
-
- public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
- return _stream.partitionBy(_groupFields)
- .stateQuery(state,
- inputFields,
- function,
- functionFields);
- }
-
- public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
- return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
- }
-
- public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
- return persistentAggregate(spec, null, agg, functionFields);
- }
-
- public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
- return stateQuery(state, null, function, functionFields);
- }
-
- @Override
- public IAggregatableStream each(Fields inputFields, Function function, Fields functionFields) {
- Stream s = _stream.each(inputFields, function, functionFields);
- return new GroupedStream(s, _groupFields);
- }
-
- @Override
- public IAggregatableStream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
- Aggregator groupedAgg = new GroupedAggregator(agg, _groupFields, inputFields, functionFields.size());
- Fields allInFields = TridentUtils.fieldsUnion(_groupFields, inputFields);
- Fields allOutFields = TridentUtils.fieldsConcat(_groupFields, functionFields);
- Stream s = _stream.partitionAggregate(allInFields, groupedAgg, allOutFields);
- return new GroupedStream(s, _groupFields);
- }
-
- @Override
- public IAggregatableStream aggPartition(GroupedStream s) {
- return new GroupedStream(s._stream.partitionBy(_groupFields), _groupFields);
- }
-
- @Override
- public Stream toStream() {
- return _stream;
- }
-
- @Override
- public Fields getOutputFields() {
- return _stream.getOutputFields();
- }
-
- public Fields getGroupFields() {
- return _groupFields;
- }
-
- @Override
- public BatchToPartition singleEmitPartitioner() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java b/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java
deleted file mode 100644
index e10852e..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/IAggregatableStream.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.fluent;
-
-import backtype.storm.tuple.Fields;
-import storm.trident.Stream;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.Function;
-import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
-
-public interface IAggregatableStream {
- IAggregatableStream each(Fields inputFields, Function function, Fields functionFields);
- IAggregatableStream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields);
- Stream toStream();
- Fields getOutputFields();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java b/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java
deleted file mode 100644
index a42dfbe..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/IChainedAggregatorDeclarer.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.trident.fluent;
-
-import storm.trident.Stream;
-
-public interface IChainedAggregatorDeclarer {
- Stream chainEnd();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java b/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java
deleted file mode 100644
index 64ad621..0000000
--- a/jstorm-client/src/main/java/storm/trident/fluent/UniqueIdGen.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package storm.trident.fluent;
-
-public class UniqueIdGen {
- int _streamCounter = 0;
-
- public String getUniqueStreamId() {
- _streamCounter++;
- return "s" + _streamCounter;
- }
-
- int _stateCounter = 0;
-
- public String getUniqueStateId() {
- _stateCounter++;
- return "state" + _stateCounter;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java b/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java
deleted file mode 100644
index b107269..0000000
--- a/jstorm-client/src/main/java/storm/trident/graph/GraphGrouper.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package storm.trident.graph;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.jgrapht.DirectedGraph;
-import storm.trident.planner.Node;
-import storm.trident.util.IndexedEdge;
-
-
-public class GraphGrouper {
-
- DirectedGraph<Node, IndexedEdge> graph;
- Set<Group> currGroups;
- Map<Node, Group> groupIndex = new HashMap();
-
- public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
- this.graph = graph;
- this.currGroups = new HashSet(initialGroups);
- reindex();
- }
-
- public Collection<Group> getAllGroups() {
- return currGroups;
- }
-
- public void addGroup(Group g) {
- currGroups.add(g);
- }
-
- public void reindex() {
- groupIndex.clear();
- for(Group g: currGroups) {
- for(Node n: g.nodes) {
- groupIndex.put(n, g);
- }
- }
- }
-
- public void mergeFully() {
- boolean somethingHappened = true;
- while(somethingHappened) {
- somethingHappened = false;
- for(Group g: currGroups) {
- Collection<Group> outgoingGroups = outgoingGroups(g);
- if(outgoingGroups.size()==1) {
- Group out = outgoingGroups.iterator().next();
- if(out!=null) {
- merge(g, out);
- somethingHappened = true;
- break;
- }
- }
-
- Collection<Group> incomingGroups = incomingGroups(g);
- if(incomingGroups.size()==1) {
- Group in = incomingGroups.iterator().next();
- if(in!=null) {
- merge(g, in);
- somethingHappened = true;
- break;
- }
- }
- }
- }
- }
-
- private void merge(Group g1, Group g2) {
- Group newGroup = new Group(g1, g2);
- currGroups.remove(g1);
- currGroups.remove(g2);
- currGroups.add(newGroup);
- for(Node n: newGroup.nodes) {
- groupIndex.put(n, newGroup);
- }
- }
-
- public Collection<Group> outgoingGroups(Group g) {
- Set<Group> ret = new HashSet();
- for(Node n: g.outgoingNodes()) {
- Group other = nodeGroup(n);
- if(other==null || !other.equals(g)) {
- ret.add(other);
- }
- }
- return ret;
- }
-
- public Collection<Group> incomingGroups(Group g) {
- Set<Group> ret = new HashSet();
- for(Node n: g.incomingNodes()) {
- Group other = nodeGroup(n);
- if(other==null || !other.equals(g)) {
- ret.add(other);
- }
- }
- return ret;
- }
-
- public Group nodeGroup(Node n) {
- return groupIndex.get(n);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/graph/Group.java b/jstorm-client/src/main/java/storm/trident/graph/Group.java
deleted file mode 100644
index c329ad6..0000000
--- a/jstorm-client/src/main/java/storm/trident/graph/Group.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package storm.trident.graph;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.jgrapht.DirectedGraph;
-
-import storm.trident.planner.Node;
-import storm.trident.util.IndexedEdge;
-import storm.trident.util.TridentUtils;
-
-
-public class Group {
- public Set<Node> nodes = new HashSet<Node>();
- private DirectedGraph<Node, IndexedEdge> graph;
- private String id;
-
- public Group(DirectedGraph graph, List<Node> nodes) {
- init(graph);
- this.nodes.addAll(nodes);
- this.graph = graph;
- }
-
- public Group(DirectedGraph graph, Node n) {
- this(graph, Arrays.asList(n));
- }
-
- public Group(Group g1, Group g2) {
- init(g1.graph);
- nodes.addAll(g1.nodes);
- nodes.addAll(g2.nodes);
- }
-
- private void init(DirectedGraph graph) {
- this.graph = graph;
- this.id = UUID.randomUUID().toString();
- }
-
- public Set<Node> outgoingNodes() {
- Set<Node> ret = new HashSet<Node>();
- for(Node n: nodes) {
- ret.addAll(TridentUtils.getChildren(graph, n));
- }
- return ret;
- }
-
- public Set<Node> incomingNodes() {
- Set<Node> ret = new HashSet<Node>();
- for(Node n: nodes) {
- ret.addAll(TridentUtils.getParents(graph, n));
- }
- return ret;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((id == null) ? 0 : id.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Group other = (Group) obj;
- if (id == null) {
- if (other.id != null)
- return false;
- } else if (!id.equals(other.id))
- return false;
- return true;
- }
-
-
-
- @Override
- public String toString() {
- return nodes.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java b/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java
deleted file mode 100644
index 5181703..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Aggregator.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.tuple.TridentTuple;
-
-public interface Aggregator<T> extends Operation {
- T init(Object batchId, TridentCollector collector);
- void aggregate(T val, TridentTuple tuple, TridentCollector collector);
- void complete(T val, TridentCollector collector);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Assembly.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Assembly.java b/jstorm-client/src/main/java/storm/trident/operation/Assembly.java
deleted file mode 100644
index 17aaca2..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Assembly.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.Stream;
-
-
-public interface Assembly {
- Stream apply(Stream input);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java
deleted file mode 100644
index c97b84f..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseAggregator.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.operation;
-
-
-public abstract class BaseAggregator<T> extends BaseOperation implements Aggregator<T> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java b/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java
deleted file mode 100644
index d629d0d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseFilter.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.operation;
-
-
-public abstract class BaseFilter extends BaseOperation implements Filter {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java b/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java
deleted file mode 100644
index 8ff6b05..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseFunction.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package storm.trident.operation;
-
-
-public abstract class BaseFunction extends BaseOperation implements Function {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java
deleted file mode 100644
index 328205d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseMultiReducer.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package storm.trident.operation;
-
-import java.util.Map;
-
-public abstract class BaseMultiReducer<T> implements MultiReducer<T> {
-
- @Override
- public void prepare(Map conf, TridentMultiReducerContext context) {
- }
-
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java b/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java
deleted file mode 100644
index df6166d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/BaseOperation.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package storm.trident.operation;
-
-import java.util.Map;
-
-public class BaseOperation implements Operation {
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java
deleted file mode 100644
index 03933c9..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/CombinerAggregator.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import storm.trident.tuple.TridentTuple;
-
-// doesn't manipulate tuples (lists of stuff) so that things like aggregating into
-// cassandra is cleaner (don't need lists everywhere, just store the single value there)
-public interface CombinerAggregator<T> extends Serializable {
- T init(TridentTuple tuple);
- T combine(T val1, T val2);
- T zero();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java b/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java
deleted file mode 100644
index b56fe96..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/EachOperation.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package storm.trident.operation;
-
-public interface EachOperation extends Operation {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Filter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Filter.java b/jstorm-client/src/main/java/storm/trident/operation/Filter.java
deleted file mode 100644
index ea7cbb6..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Filter.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.tuple.TridentTuple;
-
-
-public interface Filter extends EachOperation {
- boolean isKeep(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Function.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Function.java b/jstorm-client/src/main/java/storm/trident/operation/Function.java
deleted file mode 100644
index b58a29d..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Function.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.trident.operation;
-
-import storm.trident.tuple.TridentTuple;
-
-public interface Function extends EachOperation {
- void execute(TridentTuple tuple, TridentCollector collector);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java
deleted file mode 100644
index 9223cf7..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/GroupedMultiReducer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.tuple.TridentTuple;
-
-
-public interface GroupedMultiReducer<T> extends Serializable {
- void prepare(Map conf, TridentMultiReducerContext context);
- T init(TridentCollector collector, TridentTuple group);
- void execute(T state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector);
- void complete(T state, TridentTuple group, TridentCollector collector);
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java b/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java
deleted file mode 100644
index 520f4b9..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/MultiReducer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import java.util.Map;
-import storm.trident.tuple.TridentTuple;
-
-
-public interface MultiReducer<T> extends Serializable {
- void prepare(Map conf, TridentMultiReducerContext context);
- T init(TridentCollector collector);
- void execute(T state, int streamIndex, TridentTuple input, TridentCollector collector);
- void complete(T state, TridentCollector collector);
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/Operation.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/Operation.java b/jstorm-client/src/main/java/storm/trident/operation/Operation.java
deleted file mode 100644
index f67281e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/Operation.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface Operation extends Serializable {
- void prepare(Map conf, TridentOperationContext context);
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java b/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java
deleted file mode 100644
index 3b4efca..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/ReducerAggregator.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import java.io.Serializable;
-import storm.trident.tuple.TridentTuple;
-
-public interface ReducerAggregator<T> extends Serializable {
- T init();
- T reduce(T curr, TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java b/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java
deleted file mode 100644
index b1a74d1..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/TridentCollector.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package storm.trident.operation;
-
-import java.util.List;
-
-
-public interface TridentCollector {
- void emit(List<Object> values);
- void reportError(Throwable t);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java b/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java
deleted file mode 100644
index fe0ff04..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/TridentMultiReducerContext.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package storm.trident.operation;
-
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-
-public class TridentMultiReducerContext {
- List<TridentTuple.Factory> _factories;
-
- public TridentMultiReducerContext(List<TridentTuple.Factory> factories) {
- _factories = factories;
- }
-
- public ProjectionFactory makeProjectionFactory(int streamIndex, Fields fields) {
- return new ProjectionFactory(_factories.get(streamIndex), fields);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java b/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java
deleted file mode 100644
index 3693125..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/TridentOperationContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package storm.trident.operation;
-
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.task.IMetricsContext;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView.ProjectionFactory;
-
-public class TridentOperationContext implements IMetricsContext{
- TridentTuple.Factory _factory;
- TopologyContext _topoContext;
-
- public TridentOperationContext(TopologyContext topoContext, TridentTuple.Factory factory) {
- _factory = factory;
- _topoContext = topoContext;
- }
-
- public TridentOperationContext(TridentOperationContext parent, TridentTuple.Factory factory) {
- this(parent._topoContext, factory);
- }
-
- public ProjectionFactory makeProjectionFactory(Fields fields) {
- return new ProjectionFactory(_factory, fields);
- }
-
- public int numPartitions() {
- return _topoContext.getComponentTasks(_topoContext.getThisComponentId()).size();
- }
-
- public int getPartitionIndex() {
- return _topoContext.getThisTaskIndex();
- }
-
- public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
- return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
- }
- public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
- return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
- }
- public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
- return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java
deleted file mode 100644
index e40177e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Count.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.CombinerAggregator;
-import storm.trident.tuple.TridentTuple;
-
-
-public class Count implements CombinerAggregator<Long> {
-
- @Override
- public Long init(TridentTuple tuple) {
- return 1L;
- }
-
- @Override
- public Long combine(Long val1, Long val2) {
- return val1 + val2;
- }
-
- @Override
- public Long zero() {
- return 0L;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java
deleted file mode 100644
index 34e905c..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Debug.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-public class Debug extends BaseFilter {
- private final String name;
-
- public Debug() {
- name = "DEBUG: ";
- }
-
- public Debug(String name) {
- this.name = "DEBUG(" + name + "): ";
- }
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- System.out.println(name + tuple.toString());
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java
deleted file mode 100644
index c53cfdd..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/Equals.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-
-public class Equals extends BaseFilter {
-
- @Override
- public boolean isKeep(TridentTuple tuple) {
- for(int i=0; i<tuple.size()-1; i++) {
- Object o1 = tuple.getValue(i);
- Object o2 = tuple.getValue(i+1);
- if (o1 == null) {
- if (o2 != null) {
- return false;
- }
- }else if (o1.equals(o2) == false){
- return false;
- }
-
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java
deleted file mode 100644
index bed2f1e..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/FilterNull.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package storm.trident.operation.builtin;
-
-import storm.trident.operation.BaseFilter;
-import storm.trident.tuple.TridentTuple;
-
-public class FilterNull extends BaseFilter {
- @Override
- public boolean isKeep(TridentTuple tuple) {
- for(Object o: tuple) {
- if(o==null) return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java b/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java
deleted file mode 100644
index 412badd..0000000
--- a/jstorm-client/src/main/java/storm/trident/operation/builtin/FirstN.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package storm.trident.operation.builtin;
-
-import backtype.storm.tuple.Fields;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import storm.trident.Stream;
-import storm.trident.operation.Aggregator;
-import storm.trident.operation.Assembly;
-import storm.trident.operation.BaseAggregator;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
-
-
-public class FirstN implements Assembly {
-
- Aggregator _agg;
-
- public FirstN(int n, String sortField) {
- this(n, sortField, false);
- }
-
- public FirstN(int n, String sortField, boolean reverse) {
- if(sortField!=null) {
- _agg = new FirstNSortedAgg(n, sortField, reverse);
- } else {
- _agg = new FirstNAgg(n);
- }
- }
-
- @Override
- public Stream apply(Stream input) {
- Fields outputFields = input.getOutputFields();
- return input.partitionAggregate(outputFields, _agg, outputFields)
- .global()
- .partitionAggregate(outputFields, _agg, outputFields);
- }
-
- public static class FirstNAgg extends BaseAggregator<FirstNAgg.State> {
- int _n;
-
- public FirstNAgg(int n) {
- _n = n;
- }
-
- static class State {
- int emitted = 0;
- }
-
- @Override
- public State init(Object batchId, TridentCollector collector) {
- return new State();
- }
-
- @Override
- public void aggregate(State val, TridentTuple tuple, TridentCollector collector) {
- if(val.emitted < _n) {
- collector.emit(tuple);
- val.emitted++;
- }
- }
-
- @Override
- public void complete(State val, TridentCollector collector) {
- }
-
- }
-
- public static class FirstNSortedAgg extends BaseAggregator<PriorityQueue> {
-
- int _n;
- String _sortField;
- boolean _reverse;
-
- public FirstNSortedAgg(int n, String sortField, boolean reverse) {
- _n = n;
- _sortField = sortField;
- _reverse = reverse;
- }
-
- @Override
- public PriorityQueue init(Object batchId, TridentCollector collector) {
- return new PriorityQueue(_n, new Comparator<TridentTuple>() {
- @Override
- public int compare(TridentTuple t1, TridentTuple t2) {
- Comparable c1 = (Comparable) t1.getValueByField(_sortField);
- Comparable c2 = (Comparable) t2.getValueByField(_sortField);
- int ret = c1.compareTo(c2);
- if(_reverse) ret *= -1;
- return ret;
- }
- });
- }
-
- @Override
- public void aggregate(PriorityQueue state, TridentTuple tuple, TridentCollector collector) {
- state.add(tuple);
- }
-
- @Override
- public void complete(PriorityQueue val, TridentCollector collector) {
- int total = val.size();
- for(int i=0; i<_n && i < total; i++) {
- TridentTuple t = (TridentTuple) val.remove();
- collector.emit(t);
- }
- }
- }
-}