You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/07 01:28:16 UTC
[03/12] storm git commit: [STORM-1961] Stream api for storm core use
cases
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java b/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java
new file mode 100644
index 0000000..c123658
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.streams.processors.StatefulProcessor;
+import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseStatefulBolt;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.DirectedGraph;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Stream bolt that executes stateful operations like update state and state query.
+ */
+class StatefulProcessorBolt<K, V> extends BaseStatefulBolt<KeyValueState<K, V>> implements StreamBolt {
+ private final ProcessorBoltDelegate delegate;
+ // can be UpdateStateByKey or StateQuery processors
+ private final Set<StatefulProcessor<K, V>> statefulProcessors;
+
+ StatefulProcessorBolt(String boltId, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
+ delegate = new ProcessorBoltDelegate(boltId, graph, nodes);
+ statefulProcessors = getStatefulProcessors(nodes);
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ delegate.prepare(stormConf, context, collector);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ delegate.processAndAck(input);
+ }
+
+ @Override
+ public void initState(KeyValueState<K, V> state) {
+ for (StatefulProcessor<K, V> statefulProcessor : statefulProcessors) {
+ statefulProcessor.initState(state);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ delegate.declareOutputFields(declarer);
+ }
+
+ @Override
+ public void setTimestampField(String fieldName) {
+ delegate.setTimestampField(fieldName);
+ }
+
+ @Override
+ public String getId() {
+ return delegate.getId();
+ }
+
+ void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+ delegate.setStreamToInitialProcessors(streamToInitialProcessors);
+ }
+
+ public List<ProcessorNode> getNodes() {
+ return delegate.getNodes();
+ }
+
+ void addStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+ delegate.addStreamToInitialProcessors(streamToInitialProcessors);
+ }
+
+ void addNodes(List<ProcessorNode> nodes) {
+ delegate.addNodes(nodes);
+ statefulProcessors.addAll(getStatefulProcessors(nodes));
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<StatefulProcessor<K, V>> getStatefulProcessors(List<ProcessorNode> nodes) {
+ Set<StatefulProcessor<K, V>> statefulProcessors = new HashSet<>();
+ int updateStateByKeyCount = 0;
+ for (ProcessorNode node : nodes) {
+ if (node.getProcessor() instanceof StatefulProcessor) {
+ statefulProcessors.add((StatefulProcessor<K, V>) node.getProcessor());
+ if (node.getProcessor() instanceof UpdateStateByKeyProcessor) {
+ if (++updateStateByKeyCount > 1) {
+ throw new IllegalArgumentException("Cannot have more than one updateStateByKey processor " +
+ "in a StatefulProcessorBolt");
+ }
+ }
+
+ }
+ }
+ return statefulProcessors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Stream.java b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
new file mode 100644
index 0000000..e50e7a2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.Consumer;
+import org.apache.storm.streams.operations.FlatMapFunction;
+import org.apache.storm.streams.operations.Function;
+import org.apache.storm.streams.operations.IdentityFunction;
+import org.apache.storm.streams.operations.PairFlatMapFunction;
+import org.apache.storm.streams.operations.PairFunction;
+import org.apache.storm.streams.operations.Predicate;
+import org.apache.storm.streams.operations.PrintConsumer;
+import org.apache.storm.streams.operations.Reducer;
+import org.apache.storm.streams.processors.AggregateProcessor;
+import org.apache.storm.streams.processors.BranchProcessor;
+import org.apache.storm.streams.processors.FilterProcessor;
+import org.apache.storm.streams.processors.FlatMapProcessor;
+import org.apache.storm.streams.processors.ForEachProcessor;
+import org.apache.storm.streams.processors.MapProcessor;
+import org.apache.storm.streams.processors.PeekProcessor;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.streams.processors.ReduceProcessor;
+import org.apache.storm.streams.processors.StateQueryProcessor;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a stream of values.
+ *
+ * @param <T> the type of the value
+ */
+public class Stream<T> {
+ protected static final Fields KEY = new Fields("key");
+ protected static final Fields VALUE = new Fields("value");
+ protected static final Fields KEY_VALUE = new Fields("key", "value");
+ // the stream builder
+ protected final StreamBuilder streamBuilder;
+ // the current node
+ protected final Node node;
+ // the stream id from node's output stream(s) that this stream represents
+ protected final String stream;
+
+ Stream(StreamBuilder streamBuilder, Node node) {
+ this(streamBuilder, node, node.getOutputStreams().iterator().next());
+ }
+
+ private Stream(StreamBuilder streamBuilder, Node node, String stream) {
+ this.streamBuilder = streamBuilder;
+ this.node = node;
+ this.stream = stream;
+ }
+
+ /**
+ * Returns a stream consisting of the elements of this stream that matches the given filter.
+ *
+ * @param predicate the predicate to apply to each element to determine if it should be included
+ * @return the new stream
+ */
+ public Stream<T> filter(Predicate<? super T> predicate) {
+ return new Stream<>(streamBuilder, addProcessorNode(new FilterProcessor<>(predicate), VALUE));
+ }
+
+ /**
+ * Returns a stream consisting of the result of applying the given mapping function to the values of this stream.
+ *
+ * @param function a mapping function to be applied to each value in this stream.
+ * @return the new stream
+ */
+ public <R> Stream<R> map(Function<? super T, ? extends R> function) {
+ return new Stream<>(streamBuilder, addProcessorNode(new MapProcessor<>(function), VALUE));
+ }
+
+ /**
+ * Returns a stream of key-value pairs by applying a {@link PairFunction} on each value of this stream.
+ *
+ * @param function the mapping function to be applied to each value in this stream
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the new stream of key-value pairs
+ */
+ public <K, V> PairStream<K, V> mapToPair(PairFunction<T, K, V> function) {
+ return new PairStream<>(streamBuilder, addProcessorNode(new MapProcessor<>(function), KEY_VALUE));
+ }
+
+ /**
+ * Returns a stream consisting of the results of replacing each value of this stream with the contents
+ * produced by applying the provided mapping function to each value. This has the effect of applying
+ * a one-to-many transformation to the values of the stream, and then flattening the resulting elements
+ * into a new stream.
+ *
+ * @param function a mapping function to be applied to each value in this stream which produces new values.
+ * @return the new stream
+ */
+ public <R> Stream<R> flatMap(FlatMapFunction<T, R> function) {
+ return new Stream<>(streamBuilder, addProcessorNode(new FlatMapProcessor<>(function), VALUE));
+ }
+
+ /**
+ * Returns a stream consisting of the results of replacing each value of this stream with the key-value pairs
+ * produced by applying the provided mapping function to each value.
+ *
+ * @param function the mapping function to be applied to each value in this stream which produces new key-value pairs.
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the new stream of key-value pairs
+ * @see #flatMap(FlatMapFunction)
+ * @see #mapToPair(PairFunction)
+ */
+ public <K, V> PairStream<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> function) {
+ return new PairStream<>(streamBuilder, addProcessorNode(new FlatMapProcessor<>(function), KEY_VALUE));
+ }
+
+ /**
+ * Returns a new stream consisting of the elements that fall within the window as specified by the window parameter.
+ * The {@link Window} specification could be used to specify sliding or tumbling windows based on
+ * time duration or event count. For example,
+ * <pre>
+ * // time duration based sliding window
+ * stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1));
+ *
+ * // count based sliding window
+ * stream.window(SlidingWindows.of(Count.(10), Count.of(2)));
+ *
+ * // time duration based tumbling window
+ * stream.window(TumblingWindows.of(Duration.seconds(10));
+ * </p>
+ *
+ * @see org.apache.storm.streams.windowing.SlidingWindows
+ * @see org.apache.storm.streams.windowing.TumblingWindows
+ * @param window the window configuration
+ * @return the new stream
+ */
+ public Stream<T> window(Window<?, ?> window) {
+ return new Stream<>(streamBuilder, addNode(new WindowNode(window, stream, node.getOutputFields())));
+ }
+
+ /**
+ * Performs an action for each element of this stream.
+ *
+ * @param action an action to perform on the elements
+ */
+ public void forEach(Consumer<? super T> action) {
+ addProcessorNode(new ForEachProcessor<>(action), new Fields());
+ }
+
+ /**
+ * Returns a stream consisting of the elements of this stream, additionally performing the provided action on
+ * each element as they are consumed from the resulting stream.
+ *
+ * @param action the action to perform on the element as they are consumed from the stream
+ * @return the new stream
+ */
+ public Stream<T> peek(Consumer<? super T> action) {
+ return new Stream<>(streamBuilder, addProcessorNode(new PeekProcessor<>(action), node.getOutputFields()));
+ }
+
+ /**
+ * Aggregates the values in this stream using the aggregator. This does a global aggregation, i.e. the elements
+ * across all the partitions are forwarded to a single task for computing the aggregate.
+ * <p>
+ * If the stream is windowed, the aggregate result is emitted after each window activation and represents the
+ * aggregate of elements that fall within that window.
+ * If the stream is not windowed, the aggregate result is emitted as each new element in the stream is processed.
+ * </p>
+ *
+ * @param aggregator the aggregator
+ * @param <R> the result type
+ * @return the new stream
+ */
+ public <R> Stream<R> aggregate(Aggregator<? super T, ? extends R> aggregator) {
+ return new Stream<>(streamBuilder, global().addProcessorNode(new AggregateProcessor<>(aggregator), VALUE));
+ }
+
+ /**
+ * Performs a reduction on the elements of this stream, by repeatedly applying the reducer.
+ * <p>
+ * If the stream is windowed, the result is emitted after each window activation and represents the
+ * reduction of elements that fall within that window.
+ * If the stream is not windowed, the result is emitted as each new element in the stream is processed.
+ * </p>
+ *
+ * @param reducer the reducer
+ * @return the new stream
+ */
+ public Stream<T> reduce(Reducer<T> reducer) {
+ return new Stream<>(streamBuilder, global().addProcessorNode(new ReduceProcessor<>(reducer), VALUE));
+ }
+
+ /**
+ * Returns a new stream with the given value of parallelism. Further operations on this stream
+ * would execute at this level of parallelism.
+ *
+ * @param parallelism the parallelism value
+ * @return the new stream
+ */
+ public Stream<T> repartition(int parallelism) {
+ if (parallelism < 1) {
+ throw new IllegalArgumentException("Parallelism should be >= 1");
+ }
+ Node partitionNode = addNode(node, new PartitionNode(stream, node.getOutputFields()), parallelism);
+ return new Stream<>(streamBuilder, partitionNode);
+ }
+
+ /**
+ * Returns an array of streams by splitting the given stream into multiple branches based on the given
+ * predicates. The predicates are applied in the given order to the values of this stream and the result
+ * is forwarded to the corresponding (index based) result stream based on the (index of) predicate that matches.
+ * If none of the predicates match a value, that value is dropped.
+ *
+ * @param predicates the predicates
+ * @return an array of result streams (branches) corresponding to the given predicates
+ */
+ @SuppressWarnings("unchecked")
+ public Stream<T>[] branch(Predicate<T>... predicates) {
+ List<Stream<T>> childStreams = new ArrayList<>();
+ if (predicates.length > 0) {
+ BranchProcessor<T> branchProcessor = new BranchProcessor<>();
+ Node branchNode = addProcessorNode(branchProcessor, VALUE);
+ for (Predicate<T> predicate : predicates) {
+ // create a child node (identity) per branch
+ ProcessorNode child = makeProcessorNode(new MapProcessor<>(new IdentityFunction<>()), node.getOutputFields());
+ String branchStream = child.getOutputStreams().iterator().next() + "-branch";
+ // branchStream is the parent stream that connects branch processor to this child
+ branchNode.addOutputStream(branchStream);
+ addNode(branchNode, child, branchStream);
+ childStreams.add(new Stream<>(streamBuilder, child));
+ branchProcessor.addPredicate(predicate, branchStream);
+ }
+ }
+ return childStreams.toArray((Stream<T>[]) new Stream[childStreams.size()]);
+ }
+
+ /**
+ * Print the values in this stream.
+ */
+ public void print() {
+ forEach(new PrintConsumer<T>());
+ }
+
+ /**
+ * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+ * sinks in the stream, for e.g. a {@code RedisStoreBolt}. The bolt would have a parallelism of 1.
+ * <p>
+ * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+ * </p>
+ *
+ * @param bolt the bolt
+ */
+ public void to(IRichBolt bolt) {
+ to(bolt, 1);
+ }
+
+ /**
+ * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+ * sinks in the stream, for e.g. a {@code RedisStoreBolt}.
+ * <p>
+ * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+ * </p>
+ *
+ * @param bolt the bolt
+ * @param parallelism the parallelism of the bolt
+ */
+ public void to(IRichBolt bolt, int parallelism) {
+ addSinkNode(new SinkNode(bolt), parallelism);
+ }
+
+ /**
+ * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+ * sinks in the stream, for e.g. a {@code RedisStoreBolt}. The bolt would have a parallelism of 1.
+ * <p>
+ * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+ * </p>
+ *
+ * @param bolt the bolt
+ */
+ public void to(IBasicBolt bolt) {
+ to(bolt, 1);
+ }
+
+ /**
+ * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+ * sinks in the stream, for e.g. a {@code RedisStoreBolt}.
+ * <p>
+ * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+ * </p>
+ *
+ * @param bolt the bolt
+ * @param parallelism the parallelism of the bolt
+ */
+ public void to(IBasicBolt bolt, int parallelism) {
+ addSinkNode(new SinkNode(bolt), parallelism);
+ }
+
+ /**
+ * Queries the given stream state with the values in this stream as the keys.
+ *
+ * @param streamState the stream state
+ * @param <V> the value type
+ * @return the result stream
+ */
+ public <V> PairStream<T, V> stateQuery(StreamState<T, V> streamState) {
+ // need all grouping for state query since the state is local
+ Node node = all().addProcessorNode(new StateQueryProcessor<>(streamState), KEY_VALUE);
+ // add 'updateState' node as parent so that state query gets processed after update state
+ addNode(streamState.getUpdateStateNode(), node, node.getParallelism());
+ return new PairStream<>(streamBuilder, node);
+ }
+
+ Node getNode() {
+ return node;
+ }
+
+ Node addNode(Node parent, Node child, int parallelism) {
+ return streamBuilder.addNode(parent, child, parallelism);
+ }
+
+ Node addNode(Node child) {
+ return addNode(this.node, child);
+ }
+
+ Node addProcessorNode(Processor<?> processor, Fields outputFields) {
+ return addNode(makeProcessorNode(processor, outputFields));
+ }
+
+ String getStream() {
+ return stream;
+ }
+
+ private Node addNode(Node parent, Node child) {
+ return streamBuilder.addNode(parent, child);
+ }
+
+ private Node addNode(Node parent, Node child, String parentStreamId) {
+ return streamBuilder.addNode(parent, child, parentStreamId);
+ }
+
+ private Node addNode(Node child, int parallelism, String parentStreamId) {
+ return streamBuilder.addNode(this.node, child, parallelism, parentStreamId);
+ }
+
+ private ProcessorNode makeProcessorNode(Processor<?> processor, Fields outputFields) {
+ return new ProcessorNode(processor, UniqueIdGen.getInstance().getUniqueStreamId(), outputFields);
+ }
+
+ private void addSinkNode(SinkNode sinkNode, int parallelism) {
+ String boltId = UniqueIdGen.getInstance().getUniqueBoltId();
+ sinkNode.setComponentId(boltId);
+ sinkNode.setParallelism(parallelism);
+ if (node instanceof SpoutNode) {
+ addNode(sinkNode, parallelism, Utils.DEFAULT_STREAM_ID);
+ } else {
+ /*
+ * add a stream__sink stream to the current node (parent) for emitting
+ * just the values (no punctuation) to the bolt.
+ */
+ String sinkStream = StreamUtil.getSinkStream(stream);
+ node.addOutputStream(sinkStream);
+ addNode(sinkNode, parallelism, sinkStream);
+ }
+ }
+
+ private Stream<T> global() {
+ Node partitionNode = addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.global()));
+ return new Stream<>(streamBuilder, partitionNode);
+ }
+
+ private Stream<T> all() {
+ Node partitionNode = addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.all()));
+ return new Stream<>(streamBuilder, partitionNode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java b/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java
new file mode 100644
index 0000000..6170ea8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+/**
+ * Interface for bolts that executes the streaming operations via the
+ * processors.
+ */
+interface StreamBolt {
+ /**
+ * The bolt component id. This is auto generated value like (bolt-N).
+ *
+ * @return the bolt id.
+ */
+ String getId();
+
+ /**
+ * Set the timestamp field name for event time based processing.
+ *
+ * @param fieldName the timestamp field name
+ */
+ void setTimestampField(String fieldName);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
new file mode 100644
index 0000000..e19a0c6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.streams.operations.IdentityFunction;
+import org.apache.storm.streams.operations.mappers.PairValueMapper;
+import org.apache.storm.streams.operations.mappers.TupleValueMapper;
+import org.apache.storm.streams.processors.JoinProcessor;
+import org.apache.storm.streams.processors.MapProcessor;
+import org.apache.storm.streams.processors.StateQueryProcessor;
+import org.apache.storm.streams.processors.StatefulProcessor;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.jgrapht.traverse.TopologicalOrderIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * A builder for constructing a {@link StormTopology} via storm streams api (DSL)
+ */
+public class StreamBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamBuilder.class);
+ private final DefaultDirectedGraph<Node, Edge> graph;
+ private final Table<Node, String, GroupingInfo> nodeGroupingInfo = HashBasedTable.create();
+ private final Map<Node, WindowNode> windowInfo = new HashMap<>();
+ private final List<ProcessorNode> curGroup = new ArrayList<>();
+ private int statefulProcessorCount = 0;
+ private final Map<StreamBolt, BoltDeclarer> streamBolts = new HashMap<>();
+ private String timestampFieldName = null;
+
+ /**
+ * Creates a new {@link StreamBuilder}
+ */
+ public StreamBuilder() {
+ graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory());
+ }
+
+ /**
+ * Creates a new {@link Stream} of tuples from the given {@link IRichSpout}
+ *
+ * @param spout the spout
+ * @return the new stream
+ */
+ public Stream<Tuple> newStream(IRichSpout spout) {
+ return newStream(spout, 1);
+ }
+
+ /**
+ * Creates a new {@link Stream} of tuples from the given {@link IRichSpout} with the given
+ * parallelism.
+ *
+ * @param spout the spout
+ * @param parallelism the parallelism of the stream
+ * @return the new stream
+ */
+ public Stream<Tuple> newStream(IRichSpout spout, int parallelism) {
+ SpoutNode spoutNode = new SpoutNode(spout);
+ String spoutId = UniqueIdGen.getInstance().getUniqueSpoutId();
+ spoutNode.setComponentId(spoutId);
+ spoutNode.setParallelism(parallelism);
+ graph.addVertex(spoutNode);
+ return new Stream<>(this, spoutNode);
+ }
+
+ /**
+ * Creates a new {@link Stream} of values from the given {@link IRichSpout} by extracting field(s)
+ * from tuples via the supplied {@link TupleValueMapper}.
+ *
+ * @param spout the spout
+ * @param valueMapper the value mapper
+ * @param <T> the type of values in the resultant stream
+ * @return the new stream
+ */
+ public <T> Stream<T> newStream(IRichSpout spout, TupleValueMapper<T> valueMapper) {
+ return newStream(spout).map(valueMapper);
+ }
+
+
+ /**
+ * Creates a new {@link Stream} of values from the given {@link IRichSpout} by extracting field(s)
+ * from tuples via the supplied {@link TupleValueMapper} with the given parallelism.
+ *
+ * @param spout the spout
+ * @param valueMapper the value mapper
+ * @param parallelism the parallelism of the stream
+ * @param <T> the type of values in the resultant stream
+ * @return the new stream
+ */
+ public <T> Stream<T> newStream(IRichSpout spout, TupleValueMapper<T> valueMapper, int parallelism) {
+ return newStream(spout, parallelism).map(valueMapper);
+ }
+
+ /**
+ * Creates a new {@link PairStream} of key-value pairs from the given {@link IRichSpout} by extracting key and
+ * value from tuples via the supplied {@link PairValueMapper}.
+ *
+ * @param spout the spout
+ * @param pairValueMapper the pair value mapper
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the new stream of key-value pairs
+ */
+ public <K, V> PairStream<K, V> newStream(IRichSpout spout, PairValueMapper<K, V> pairValueMapper) {
+ return newStream(spout).mapToPair(pairValueMapper);
+ }
+
+ /**
+ * Creates a new {@link PairStream} of key-value pairs from the given {@link IRichSpout} by extracting key and
+ * value from tuples via the supplied {@link PairValueMapper} and with the given value of parallelism.
+ *
+ * @param spout the spout
+ * @param pairValueMapper the pair value mapper
+ * @param parallelism the parallelism of the stream
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the new stream of key-value pairs
+ */
+ public <K, V> PairStream<K, V> newStream(IRichSpout spout, PairValueMapper<K, V> pairValueMapper, int parallelism) {
+ return newStream(spout, parallelism).mapToPair(pairValueMapper);
+ }
+
+
+ /**
+ * Builds a new {@link StormTopology} for the computation expressed
+ * via the stream api.
+ *
+ * @return the storm topology
+ */
+ public StormTopology build() {
+ nodeGroupingInfo.clear();
+ windowInfo.clear();
+ curGroup.clear();
+ TopologicalOrderIterator<Node, Edge> iterator = new TopologicalOrderIterator<>(graph, queue());
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ while (iterator.hasNext()) {
+ Node node = iterator.next();
+ if (node instanceof SpoutNode) {
+ addSpout(topologyBuilder, (SpoutNode) node);
+ } else if (node instanceof ProcessorNode) {
+ handleProcessorNode((ProcessorNode) node, topologyBuilder);
+ } else if (node instanceof PartitionNode) {
+ updateNodeGroupingInfo((PartitionNode) node);
+ processCurGroup(topologyBuilder);
+ } else if (node instanceof WindowNode) {
+ updateWindowInfo((WindowNode) node);
+ processCurGroup(topologyBuilder);
+ } else if (node instanceof SinkNode) {
+ processCurGroup(topologyBuilder);
+ addSink(topologyBuilder, (SinkNode) node);
+ }
+ }
+ processCurGroup(topologyBuilder);
+ mayBeAddTsField();
+ return topologyBuilder.createTopology();
+ }
+
+ Node addNode(Node parent, Node child) {
+ return addNode(parent, child, parent.getParallelism(), parent.getOutputStreams().iterator().next());
+ }
+
+ Node addNode(Node parent, Node child, int parallelism) {
+ return addNode(parent, child, parallelism, parent.getOutputStreams().iterator().next());
+ }
+
+ Node addNode(Node parent, Node child, String parentStreamId) {
+ return addNode(parent, child, parent.getParallelism(), parentStreamId);
+ }
+
+ Node addNode(Node parent, Node child, int parallelism, String parentStreamId) {
+ graph.addVertex(child);
+ graph.addEdge(parent, child);
+ child.setParallelism(parallelism);
+ if (parent instanceof WindowNode || parent instanceof PartitionNode) {
+ child.addParentStream(parentNode(parent), parentStreamId);
+ } else {
+ child.addParentStream(parent, parentStreamId);
+ }
+ return child;
+ }
+
+ private PriorityQueue<Node> queue() {
+ // min-heap
+ return new PriorityQueue<>(new Comparator<Node>() {
+ @Override
+ public int compare(Node n1, Node n2) {
+ return getPriority(n1.getClass()) - getPriority(n2.getClass());
+ }
+
+ private int getPriority(Class<? extends Node> clazz) {
+ /*
+ * Nodes in the descending order of priority.
+ * ProcessorNode has the highest priority so that the topological order iterator
+ * will group as many processor nodes together as possible.
+ */
+ Class<?>[] p = new Class<?>[]{
+ ProcessorNode.class,
+ SpoutNode.class,
+ SinkNode.class,
+ PartitionNode.class,
+ WindowNode.class};
+ for (int i = 0; i < p.length; i++) {
+ if (clazz.equals(p[i])) {
+ return i;
+ }
+ }
+ return Integer.MAX_VALUE;
+ }
+ });
+ }
+
+ private void handleProcessorNode(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
+ if (processorNode.getProcessor() instanceof StatefulProcessor) {
+ statefulProcessorCount++;
+ Set<ProcessorNode> initialNodes = initialProcessors(
+ curGroup.isEmpty() ? Collections.singletonList(processorNode) : curGroup);
+ Set<Window<?, ?>> windows = getWindowParams(initialNodes);
+ // if we get more than one stateful operation, we need to process the
+ // current group so that we have one stateful operation per stateful bolt
+ if (statefulProcessorCount > 1 || !windows.isEmpty()) {
+ if (!curGroup.isEmpty()) {
+ processCurGroup(topologyBuilder);
+ } else if (!windows.isEmpty()) {
+ // a stateful processor immediately follows a window specification
+ splitStatefulProcessor(processorNode, topologyBuilder);
+ }
+ statefulProcessorCount = 1;
+ }
+ }
+ curGroup.add(processorNode);
+ }
+
+ /*
+ * force create a windowed bolt with identity nodes so that we don't
+ * have a stateful processor inside a windowed bolt.
+ */
+ private void splitStatefulProcessor(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
+ for (Node parent : StreamUtil.<Node>getParents(graph, processorNode)) {
+ ProcessorNode identity =
+ new ProcessorNode(new MapProcessor<>(new IdentityFunction<>()),
+ UniqueIdGen.getInstance().getUniqueStreamId(),
+ parent.getOutputFields());
+ addNode(parent, identity);
+ graph.removeEdge(parent, processorNode);
+ processorNode.removeParentStreams(parent);
+ addNode(identity, processorNode);
+ curGroup.add(identity);
+ }
+ processCurGroup(topologyBuilder);
+ }
+
+ private void mayBeAddTsField() {
+ if (timestampFieldName != null) {
+ for (StreamBolt streamBolt : streamBolts.keySet()) {
+ streamBolt.setTimestampField(timestampFieldName);
+ }
+ }
+ }
+
+ private void updateNodeGroupingInfo(PartitionNode partitionNode) {
+ if (partitionNode.getGroupingInfo() != null) {
+ for (Node parent : parentNodes(partitionNode)) {
+ for (String parentStream : partitionNode.getParentStreams(parent)) {
+ nodeGroupingInfo.put(parent, parentStream, partitionNode.getGroupingInfo());
+ }
+ }
+ }
+ }
+
+ private void updateWindowInfo(WindowNode windowNode) {
+ for (Node parent : parentNodes(windowNode)) {
+ windowInfo.put(parent, windowNode);
+ }
+ String tsField = windowNode.getWindowParams().getTimestampField();
+ if (tsField != null) {
+ if (timestampFieldName != null && !tsField.equals(timestampFieldName)) {
+ throw new IllegalArgumentException("Cannot set different timestamp field names");
+ }
+ timestampFieldName = tsField;
+ }
+ }
+
+ private Node parentNode(Node curNode) {
+ Set<Node> parentNode = parentNodes(curNode);
+ if (parentNode.size() > 1) {
+ throw new IllegalArgumentException("Node " + curNode + " has more than one parent node.");
+ }
+ if (parentNode.isEmpty()) {
+ throw new IllegalArgumentException("Node " + curNode + " has no parent.");
+ }
+ return parentNode.iterator().next();
+ }
+
+ private Set<Node> parentNodes(Node curNode) {
+ Set<Node> nodes = new HashSet<>();
+ for (Node parent : StreamUtil.<Node>getParents(graph, curNode)) {
+ if (parent instanceof ProcessorNode || parent instanceof SpoutNode) {
+ nodes.add(parent);
+ } else {
+ nodes.addAll(parentNodes(parent));
+ }
+ }
+ return nodes;
+ }
+
+ private void processCurGroup(TopologyBuilder topologyBuilder) {
+ if (curGroup.isEmpty()) {
+ return;
+ }
+
+ String boltId = UniqueIdGen.getInstance().getUniqueBoltId();
+ for (ProcessorNode processorNode : curGroup) {
+ processorNode.setComponentId(boltId);
+ processorNode.setWindowed(isWindowed(processorNode));
+ processorNode.setWindowedParentStreams(getWindowedParentStreams(processorNode));
+ }
+ final Set<ProcessorNode> initialProcessors = initialProcessors(curGroup);
+ Set<Window<?, ?>> windowParams = getWindowParams(initialProcessors);
+ if (windowParams.isEmpty()) {
+ if (hasStatefulProcessor(curGroup)) {
+ addStatefulBolt(topologyBuilder, boltId, initialProcessors);
+ } else {
+ addBolt(topologyBuilder, boltId, initialProcessors);
+ }
+ } else if (windowParams.size() == 1) {
+ addWindowedBolt(topologyBuilder, boltId, initialProcessors, windowParams.iterator().next());
+ } else {
+ throw new IllegalStateException("More than one window config for current group " + curGroup);
+ }
+ curGroup.clear();
+ }
+
+ private boolean hasStatefulProcessor(List<ProcessorNode> processorNodes) {
+ for (ProcessorNode node : processorNodes) {
+ if (node.getProcessor() instanceof StatefulProcessor) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int getParallelism() {
+ Set<Integer> parallelisms = new HashSet<>(Collections2.transform(curGroup, new Function<ProcessorNode, Integer>() {
+ @Override
+ public Integer apply(ProcessorNode input) {
+ return input.getParallelism();
+ }
+ }));
+
+ if (parallelisms.size() > 1) {
+ throw new IllegalStateException("Current group does not have same parallelism " + curGroup);
+ }
+
+ return parallelisms.isEmpty() ? 1 : parallelisms.iterator().next();
+ }
+
+ private Set<Window<?, ?>> getWindowParams(Set<ProcessorNode> initialProcessors) {
+ Set<WindowNode> windowNodes = new HashSet<>();
+ Set<Node> parents;
+ for (ProcessorNode processorNode : initialProcessors) {
+ if (processorNode.getProcessor() instanceof JoinProcessor) {
+ String leftStream = ((JoinProcessor) processorNode.getProcessor()).getLeftStream();
+ parents = processorNode.getParents(leftStream);
+ } else {
+ parents = parentNodes(processorNode);
+ }
+ for (Node node : parents) {
+ if (windowInfo.containsKey(node)) {
+ windowNodes.add(windowInfo.get(node));
+ }
+ }
+ }
+
+ Set<Window<?, ?>> windowParams = new HashSet<>();
+ if (!windowNodes.isEmpty()) {
+ windowParams.addAll(new HashSet<>(Collections2.transform(windowNodes, new Function<WindowNode, Window<?, ?>>() {
+ @Override
+ public Window<?, ?> apply(WindowNode input) {
+ return input.getWindowParams();
+ }
+ })));
+ }
+ return windowParams;
+ }
+
+ private void addSpout(TopologyBuilder topologyBuilder, SpoutNode spout) {
+ topologyBuilder.setSpout(spout.getComponentId(), spout.getSpout(), spout.getParallelism());
+ }
+
+ private void addSink(TopologyBuilder topologyBuilder, SinkNode sinkNode) {
+ IComponent bolt = sinkNode.getBolt();
+ BoltDeclarer boltDeclarer;
+ if (bolt instanceof IRichBolt) {
+ boltDeclarer = topologyBuilder.setBolt(sinkNode.getComponentId(), (IRichBolt) bolt, sinkNode.getParallelism());
+ } else if (bolt instanceof IBasicBolt) {
+ boltDeclarer = topologyBuilder.setBolt(sinkNode.getComponentId(), (IBasicBolt) bolt, sinkNode.getParallelism());
+ } else {
+ throw new IllegalArgumentException("Expect IRichBolt or IBasicBolt in addBolt");
+ }
+ for (Node parent : parentNodes(sinkNode)) {
+ for (String stream : sinkNode.getParentStreams(parent)) {
+ declareStream(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
+ }
+ }
+ }
+
+ private StreamBolt addBolt(TopologyBuilder topologyBuilder,
+ String boltId,
+ Set<ProcessorNode> initialProcessors) {
+ ProcessorBolt bolt = new ProcessorBolt(boltId, graph, curGroup);
+ BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism());
+ bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors));
+ streamBolts.put(bolt, boltDeclarer);
+ return bolt;
+ }
+
+ private StreamBolt addStatefulBolt(TopologyBuilder topologyBuilder,
+ String boltId,
+ Set<ProcessorNode> initialProcessors) {
+ StateQueryProcessor<?, ?> stateQueryProcessor = getStateQueryProcessor();
+ StatefulProcessorBolt<?, ?> bolt;
+ if (stateQueryProcessor == null) {
+ bolt = new StatefulProcessorBolt<>(boltId, graph, curGroup);
+ BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism());
+ bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors));
+ streamBolts.put(bolt, boltDeclarer);
+ } else {
+ // state query is added to the existing stateful bolt
+ ProcessorNode updateStateNode = stateQueryProcessor.getStreamState().getUpdateStateNode();
+ bolt = findStatefulProcessorBolt(updateStateNode);
+ for (ProcessorNode node : curGroup) {
+ node.setComponentId(bolt.getId());
+ }
+ bolt.addNodes(curGroup);
+ bolt.addStreamToInitialProcessors(wireBolt(bolt.getNodes(), streamBolts.get(bolt), initialProcessors));
+ }
+ return bolt;
+ }
+
+ private StateQueryProcessor<?, ?> getStateQueryProcessor() {
+ for (ProcessorNode node : curGroup) {
+ if (node.getProcessor() instanceof StateQueryProcessor) {
+ return (StateQueryProcessor<?, ?>) node.getProcessor();
+ }
+ }
+ return null;
+ }
+
+ private StreamBolt addWindowedBolt(TopologyBuilder topologyBuilder,
+ String boltId,
+ Set<ProcessorNode> initialProcessors,
+ Window<?, ?> windowParam) {
+ WindowedProcessorBolt bolt = new WindowedProcessorBolt(boltId, graph, curGroup, windowParam);
+ BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism());
+ bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors));
+ streamBolts.put(bolt, boltDeclarer);
+ return bolt;
+ }
+
+ private StatefulProcessorBolt<?, ?> findStatefulProcessorBolt(ProcessorNode updateStateNode) {
+ for (StreamBolt bolt : streamBolts.keySet()) {
+ if (bolt instanceof StatefulProcessorBolt) {
+ StatefulProcessorBolt<?, ?> statefulProcessorBolt = (StatefulProcessorBolt) bolt;
+ if (statefulProcessorBolt.getNodes().contains(updateStateNode)) {
+ return statefulProcessorBolt;
+ }
+ }
+ }
+ throw new IllegalArgumentException("Could not find Stateful bolt for node " + updateStateNode);
+ }
+
+ private Set<String> getWindowedParentStreams(ProcessorNode processorNode) {
+ Set<String> res = new HashSet<>();
+ for (Node parent : parentNodes(processorNode)) {
+ if (parent instanceof ProcessorNode) {
+ ProcessorNode pn = (ProcessorNode) parent;
+ if (pn.isWindowed()) {
+ res.addAll(Collections2.filter(pn.getOutputStreams(), new Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return !StreamUtil.isSinkStream(input);
+ }
+ }));
+ }
+ }
+ }
+ return res;
+ }
+
+ private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> curGroup,
+ BoltDeclarer boltDeclarer,
+ Set<ProcessorNode> initialProcessors) {
+ LOG.debug("Wiring bolt with boltDeclarer {}, curGroup {}, initialProcessors {}, nodeGroupingInfo {}",
+ boltDeclarer, curGroup, initialProcessors, nodeGroupingInfo);
+ Multimap<String, ProcessorNode> streamToInitialProcessor = ArrayListMultimap.create();
+ Set<ProcessorNode> curSet = new HashSet<>(curGroup);
+ for (ProcessorNode curNode : initialProcessors) {
+ for (Node parent : parentNodes(curNode)) {
+ if (curSet.contains(parent)) {
+ LOG.debug("Parent {} of curNode {} is in curGroup {}", parent, curNode, curGroup);
+ } else {
+ for (String stream : curNode.getParentStreams(parent)) {
+ declareStream(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
+ // put global stream id for spouts
+ if (parent.getComponentId().startsWith("spout")) {
+ stream = parent.getComponentId() + stream;
+ }
+ streamToInitialProcessor.put(stream, curNode);
+ }
+ }
+ }
+ }
+ return streamToInitialProcessor;
+ }
+
+ private void declareStream(BoltDeclarer boltDeclarer, Node parent, String streamId, GroupingInfo grouping) {
+ if (grouping == null) {
+ boltDeclarer.shuffleGrouping(parent.getComponentId(), streamId);
+ } else {
+ grouping.declareGrouping(boltDeclarer, parent.getComponentId(), streamId, grouping.getFields());
+ }
+ }
+
+ private Set<ProcessorNode> initialProcessors(List<ProcessorNode> curGroup) {
+ Set<ProcessorNode> nodes = new HashSet<>();
+ Set<ProcessorNode> curSet = new HashSet<>(curGroup);
+ for (ProcessorNode node : curGroup) {
+ for (Node parent : parentNodes(node)) {
+ if (!(parent instanceof ProcessorNode) || !curSet.contains(parent)) {
+ nodes.add(node);
+ }
+ }
+ }
+ return nodes;
+ }
+
+ private boolean isWindowed(Node curNode) {
+ for (Node parent : StreamUtil.<Node>getParents(graph, curNode)) {
+ if (parent instanceof WindowNode) {
+ return true;
+ } else if (parent instanceof ProcessorNode) {
+ ProcessorNode p = (ProcessorNode) parent;
+ if (p.isWindowed()) {
+ return true;
+ }
+ } else {
+ return (parent instanceof PartitionNode) && isWindowed(parent);
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamState.java b/storm-core/src/jvm/org/apache/storm/streams/StreamState.java
new file mode 100644
index 0000000..a4633f7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamState.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import java.io.Serializable;
+
+/**
+ * A wrapper for the stream state which can be used to
+ * query the state via {@link Stream#stateQuery(StreamState)}
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class StreamState<K, V> implements Serializable {
+ private final transient PairStream<K, V> stream;
+
+ StreamState(PairStream<K, V> stream) {
+ this.stream = stream;
+ }
+
+ public PairStream<K, V> toPairStream() {
+ return stream;
+ }
+
+ ProcessorNode getUpdateStateNode() {
+ return (ProcessorNode) stream.node;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java b/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
new file mode 100644
index 0000000..0531ff6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.jgrapht.DirectedGraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamUtil {
+ @SuppressWarnings("unchecked")
+ public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node) {
+ List<Edge> incoming = new ArrayList<>(graph.incomingEdgesOf(node));
+ List<T> ret = new ArrayList<>();
+ for (Edge e : incoming) {
+ ret.add((T) e.getSource());
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> List<T> getChildren(DirectedGraph<Node, Edge> graph, Node node) {
+ List<Edge> outgoing = new ArrayList<>(graph.outgoingEdgesOf(node));
+ List<T> ret = new ArrayList<>();
+ for (Edge e : outgoing) {
+ ret.add((T) e.getTarget());
+ }
+ return ret;
+ }
+
+
+ public static boolean isSinkStream(String streamId) {
+ return streamId.endsWith("__sink");
+ }
+
+ public static String getSinkStream(String streamId) {
+ return streamId + "__sink";
+ }
+
+ public static boolean isPunctuation(Object value) {
+ if (value instanceof Pair) {
+ value = ((Pair) value).getFirst();
+ }
+ return WindowNode.PUNCTUATION.equals(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java b/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java
new file mode 100644
index 0000000..0078690
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.jgrapht.EdgeFactory;
+
+import java.io.Serializable;
+
+class StreamsEdgeFactory implements EdgeFactory<Node, Edge>, Serializable {
+ @Override
+ public Edge createEdge(Node sourceVertex, Node targetVertex) {
+ return new Edge(sourceVertex, targetVertex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java b/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
new file mode 100644
index 0000000..77973f2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+/**
+ * A tuple of three elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ */
+public class Tuple3<T1, T2, T3> {
+ public final T1 _1;
+ public final T2 _2;
+ public final T3 _3;
+
+ /**
+ * Constructs a new tuple of three elements.
+ *
+ * @param _1 the first element
+ * @param _2 the second element
+ * @param _3 the third element
+ */
+ public Tuple3(T1 _1, T2 _2, T3 _3) {
+ this._1 = _1;
+ this._2 = _2;
+ this._3 = _3;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + _1 + "," + _2 + "," + _3 + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java
new file mode 100644
index 0000000..3cbd141
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+class UniqueIdGen {
+ private int streamCounter = 0;
+ private int spoutCounter = 0;
+ private int boltCounter = 0;
+ private static final UniqueIdGen instance = new UniqueIdGen();
+
+ private UniqueIdGen() {
+ }
+
+ static UniqueIdGen getInstance() {
+ return instance;
+ }
+
+ String getUniqueStreamId() {
+ streamCounter++;
+ return "s" + streamCounter;
+ }
+
+ String getUniqueBoltId() {
+ boltCounter++;
+ return "bolt" + boltCounter;
+ }
+
+ String getUniqueSpoutId() {
+ spoutCounter++;
+ return "spout" + spoutCounter;
+ }
+
+ // for unit tests
+ void reset() {
+ streamCounter = 0;
+ spoutCounter = 0;
+ boltCounter = 0;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java b/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
new file mode 100644
index 0000000..a0e831a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Node that captures the windowing configurations.
+ */
+public class WindowNode extends Node {
+ private final Window<?, ?> windowParams;
+ public static final String PUNCTUATION = "__punctuation";
+
+ WindowNode(Window<?, ?> windowParams, String outputStream, Fields outputFields) {
+ super(outputStream, outputFields);
+ this.windowParams = windowParams;
+ }
+
+ Window<?, ?> getWindowParams() {
+ return windowParams;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java b/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
new file mode 100644
index 0000000..3971346
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.windowing.SlidingWindows;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TupleWindow;
+import org.jgrapht.DirectedGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.streams.WindowNode.PUNCTUATION;
+
+/**
+ * Stream bolt that executes windowing operations.
+ */
+class WindowedProcessorBolt extends BaseWindowedBolt implements StreamBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowedProcessorBolt.class);
+ private final ProcessorBoltDelegate delegate;
+ private final Window<?, ?> window;
+
+ WindowedProcessorBolt(String id, DirectedGraph<Node, Edge> graph,
+ List<ProcessorNode> nodes,
+ Window<?, ?> window) {
+ delegate = new ProcessorBoltDelegate(id, graph, nodes);
+ this.window = window;
+ setWindowConfig();
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ delegate.prepare(stormConf, context, collector);
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ LOG.trace("Window triggered at {}, inputWindow {}", new Date(), inputWindow);
+ if (delegate.isEventTimestamp()) {
+ delegate.setEventTimestamp(inputWindow.getTimestamp());
+ }
+ for (Tuple tuple : inputWindow.get()) {
+ Pair<Object, String> valueAndStream = delegate.getValueAndStream(tuple);
+ if (!StreamUtil.isPunctuation(valueAndStream.getFirst())) {
+ delegate.process(valueAndStream.getFirst(), valueAndStream.getSecond());
+ }
+ }
+ for (String stream : delegate.getInitialStreams()) {
+ delegate.process(PUNCTUATION, stream);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ delegate.declareOutputFields(declarer);
+ }
+
+ @Override
+ public void setTimestampField(String fieldName) {
+ delegate.setTimestampField(fieldName);
+ }
+
+ @Override
+ public String getId() {
+ return delegate.getId();
+ }
+
+ private void setWindowConfig() {
+ if (window instanceof SlidingWindows) {
+ setSlidingWindowParams(window.getWindowLength(), window.getSlidingInterval());
+ } else if (window instanceof TumblingWindows) {
+ setTumblingWindowParams(window.getWindowLength());
+ }
+ if (window.getTimestampField() != null) {
+ withTimestampField(window.getTimestampField());
+ }
+ if (window.getLag() != null) {
+ withLag(window.getLag());
+ }
+ if (window.getLateTupleStream() != null) {
+ withLateTupleStream(window.getLateTupleStream());
+ }
+ }
+
+ private void setSlidingWindowParams(Object windowLength, Object slidingInterval) {
+ if (windowLength instanceof Count) {
+ if (slidingInterval instanceof Count) {
+ withWindow((Count) windowLength, (Count) slidingInterval);
+ } else if (slidingInterval instanceof Duration) {
+ withWindow((Count) windowLength, (Duration) slidingInterval);
+ }
+ } else if (windowLength instanceof Duration) {
+ if (slidingInterval instanceof Count) {
+ withWindow((Duration) windowLength, (Count) slidingInterval);
+ } else if (slidingInterval instanceof Duration) {
+ withWindow((Duration) windowLength, (Duration) slidingInterval);
+ }
+ }
+ }
+
+ private void setTumblingWindowParams(Object windowLength) {
+ if (windowLength instanceof Count) {
+ withTumblingWindow((Count) windowLength);
+ } else if (windowLength instanceof Duration) {
+ withTumblingWindow((Duration) windowLength);
+ }
+ }
+
+ void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+ delegate.setStreamToInitialProcessors(streamToInitialProcessors);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
new file mode 100644
index 0000000..e3feaf4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Interface for aggregating values.
+ *
+ * @param <T> the original value type
+ * @param <R> the aggregated value type
+ */
+public interface Aggregator<T, R> extends Operation {
+ /**
+ * The initial value of the aggregate to start with.
+ *
+ * @return the initial value
+ */
+ R init();
+
+ /**
+ * Returns a new aggregate by applying the value with the current aggregate.
+ *
+ * @param value the value to aggregate
+ * @param aggregate the current aggregate
+ * @return the new aggregate
+ */
+ R apply(T value, R aggregate);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java
new file mode 100644
index 0000000..84653ab
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result.
+ *
+ * @param <T> the type of the input argument
+ */
+public interface Consumer<T> extends Operation {
+ /**
+ * Performs an operation on the given argument.
+ *
+ * @param input the input
+ */
+ void accept(T input);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java
new file mode 100644
index 0000000..bcacd08
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * A function that accepts one argument and returns an {@link Iterable} of elements as its result.
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the result type. An iterable of this type is returned from this function
+ */
+public interface FlatMapFunction<T, R> extends Function<T, Iterable<R>> {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java
new file mode 100644
index 0000000..7cef0a6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Represents a function that accepts one argument and produces a result.
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ */
+public interface Function<T, R> extends Operation {
+ /**
+ * Applies this function to the given argument.
+ *
+ * @param input the input to the function
+ * @return the function result
+ */
+ R apply(T input);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java
new file mode 100644
index 0000000..abb9327
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * A {@link Function} that returns the input argument itself as the result.
+ *
+ * @param <T> the type of the input
+ */
+public class IdentityFunction<T> implements Function<T, T> {
+
+ @Override
+ public T apply(T input) {
+ return input;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java
new file mode 100644
index 0000000..77dbe1e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import java.io.Serializable;
+
+/**
+ * The parent interface for any kind of streaming operation.
+ */
+public interface Operation extends Serializable {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java
new file mode 100644
index 0000000..376c1ba
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import org.apache.storm.streams.Pair;
+
+/**
+ * A function that accepts one argument and returns an {@link Iterable} of {@link Pair} as its result.
+ *
+ * @param <T> the type of the input to the function
+ * @param <K> the key type of the key-value pairs produced as a result
+ * @param <V> the value type of the key-value pairs produced as a result
+ */
+public interface PairFlatMapFunction<T, K, V> extends FlatMapFunction<T, Pair<K, V>> {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java
new file mode 100644
index 0000000..b153ff1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import org.apache.storm.streams.Pair;
+
+/**
+ * A function that accepts an argument and produces a key-value Pair.
+ *
+ * @param <T> the type of the input to the function
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface PairFunction<T, K, V> extends Function<T, Pair<K,V>> {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java
new file mode 100644
index 0000000..ca81101
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import org.apache.storm.streams.Pair;
+
+/**
+ * A {@link ValueJoiner} that joins two values to produce a {@link Pair} of the two values as the result.
+ *
+ * @param <V1> the type of the first value
+ * @param <V2> the type of the second value
+ */
+public class PairValueJoiner<V1, V2> implements ValueJoiner<V1, V2, Pair<V1, V2>> {
+ /**
+ * Joins two values and produces a {@link Pair} of the values as the result.
+ *
+ * @param value1 the first value
+ * @param value2 the second value
+ * @return a pair of the first and second value
+ */
+ @Override
+ public Pair<V1, V2> apply(V1 value1, V2 value2) {
+ return Pair.of(value1, value2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java
new file mode 100644
index 0000000..ae1be2d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Represents a predicate (boolean-valued function) of a value.
+ *
+ * @param <T> the value type
+ */
+public interface Predicate<T> extends Operation {
+ /**
+ * Evaluates this predicate on the given argument.
+ *
+ * @param input the input argument
+ * @return true if the input matches the predicate, false otherwise
+ */
+ boolean test(T input);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java
new file mode 100644
index 0000000..d301dbf
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * A consumer that prints the input to stdout.
+ *
+ * @param <T> the value type
+ */
+public class PrintConsumer<T> implements Consumer<T> {
+ @Override
+ public void accept(T input) {
+ System.out.println(input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
new file mode 100644
index 0000000..04ee70d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * The {@link Reducer} performs an operation on two values of the same type producing a result of the same
+ * type.
+ *
+ * @param <T> the type of the arguments and the result
+ */
+public interface Reducer<T> extends Operation {
+ /**
+ * Applies this function to the given arguments.
+ *
+ * @param arg1 the first argument
+ * @param arg2 the second argument
+ * @return the result
+ */
+ T apply(T arg1, T arg2);
+}