You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/07 01:28:16 UTC

[03/12] storm git commit: [STORM-1961] Stream api for storm core use cases

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java b/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java
new file mode 100644
index 0000000..c123658
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.streams.processors.StatefulProcessor;
+import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseStatefulBolt;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.DirectedGraph;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Stream bolt that executes stateful operations like update state and state query.
+ */
+class StatefulProcessorBolt<K, V> extends BaseStatefulBolt<KeyValueState<K, V>> implements StreamBolt {
+    private final ProcessorBoltDelegate delegate;
+    // can be UpdateStateByKey or StateQuery processors
+    private final Set<StatefulProcessor<K, V>> statefulProcessors;
+
+    StatefulProcessorBolt(String boltId, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
+        delegate = new ProcessorBoltDelegate(boltId, graph, nodes);
+        statefulProcessors = getStatefulProcessors(nodes);
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        delegate.prepare(stormConf, context, collector);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        delegate.processAndAck(input);
+    }
+
+    @Override
+    public void initState(KeyValueState<K, V> state) {
+        for (StatefulProcessor<K, V> statefulProcessor : statefulProcessors) {
+            statefulProcessor.initState(state);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        delegate.declareOutputFields(declarer);
+    }
+
+    @Override
+    public void setTimestampField(String fieldName) {
+        delegate.setTimestampField(fieldName);
+    }
+
+    @Override
+    public String getId() {
+        return delegate.getId();
+    }
+
+    void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+        delegate.setStreamToInitialProcessors(streamToInitialProcessors);
+    }
+
+    public List<ProcessorNode> getNodes() {
+        return delegate.getNodes();
+    }
+
+    void addStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+        delegate.addStreamToInitialProcessors(streamToInitialProcessors);
+    }
+
+    void addNodes(List<ProcessorNode> nodes) {
+        delegate.addNodes(nodes);
+        statefulProcessors.addAll(getStatefulProcessors(nodes));
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<StatefulProcessor<K, V>> getStatefulProcessors(List<ProcessorNode> nodes) {
+        Set<StatefulProcessor<K, V>> statefulProcessors = new HashSet<>();
+        int updateStateByKeyCount = 0;
+        for (ProcessorNode node : nodes) {
+            if (node.getProcessor() instanceof StatefulProcessor) {
+                statefulProcessors.add((StatefulProcessor<K, V>) node.getProcessor());
+                if (node.getProcessor() instanceof UpdateStateByKeyProcessor) {
+                    if (++updateStateByKeyCount > 1) {
+                        throw new IllegalArgumentException("Cannot have more than one updateStateByKey processor " +
+                                "in a StatefulProcessorBolt");
+                    }
+                }
+
+            }
+        }
+        return statefulProcessors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Stream.java b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
new file mode 100644
index 0000000..e50e7a2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.Consumer;
+import org.apache.storm.streams.operations.FlatMapFunction;
+import org.apache.storm.streams.operations.Function;
+import org.apache.storm.streams.operations.IdentityFunction;
+import org.apache.storm.streams.operations.PairFlatMapFunction;
+import org.apache.storm.streams.operations.PairFunction;
+import org.apache.storm.streams.operations.Predicate;
+import org.apache.storm.streams.operations.PrintConsumer;
+import org.apache.storm.streams.operations.Reducer;
+import org.apache.storm.streams.processors.AggregateProcessor;
+import org.apache.storm.streams.processors.BranchProcessor;
+import org.apache.storm.streams.processors.FilterProcessor;
+import org.apache.storm.streams.processors.FlatMapProcessor;
+import org.apache.storm.streams.processors.ForEachProcessor;
+import org.apache.storm.streams.processors.MapProcessor;
+import org.apache.storm.streams.processors.PeekProcessor;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.streams.processors.ReduceProcessor;
+import org.apache.storm.streams.processors.StateQueryProcessor;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a stream of values.
+ *
+ * @param <T> the type of the value
+ */
+public class Stream<T> {
+    protected static final Fields KEY = new Fields("key");
+    protected static final Fields VALUE = new Fields("value");
+    protected static final Fields KEY_VALUE = new Fields("key", "value");
+    // the stream builder
+    protected final StreamBuilder streamBuilder;
+    // the current node
+    protected final Node node;
+    // the stream id from node's output stream(s) that this stream represents
+    protected final String stream;
+
+    Stream(StreamBuilder streamBuilder, Node node) {
+        this(streamBuilder, node, node.getOutputStreams().iterator().next());
+    }
+
+    private Stream(StreamBuilder streamBuilder, Node node, String stream) {
+        this.streamBuilder = streamBuilder;
+        this.node = node;
+        this.stream = stream;
+    }
+
+    /**
+     * Returns a stream consisting of the elements of this stream that matches the given filter.
+     *
+     * @param predicate the predicate to apply to each element to determine if it should be included
+     * @return the new stream
+     */
+    public Stream<T> filter(Predicate<? super T> predicate) {
+        return new Stream<>(streamBuilder, addProcessorNode(new FilterProcessor<>(predicate), VALUE));
+    }
+
+    /**
+     * Returns a stream consisting of the result of applying the given mapping function to the values of this stream.
+     *
+     * @param function a mapping function to be applied to each value in this stream.
+     * @return the new stream
+     */
+    public <R> Stream<R> map(Function<? super T, ? extends R> function) {
+        return new Stream<>(streamBuilder, addProcessorNode(new MapProcessor<>(function), VALUE));
+    }
+
+    /**
+     * Returns a stream of key-value pairs by applying a {@link PairFunction} on each value of this stream.
+     *
+     * @param function the mapping function to be applied to each value in this stream
+     * @param <K>      the key type
+     * @param <V>      the value type
+     * @return the new stream of key-value pairs
+     */
+    public <K, V> PairStream<K, V> mapToPair(PairFunction<T, K, V> function) {
+        return new PairStream<>(streamBuilder, addProcessorNode(new MapProcessor<>(function), KEY_VALUE));
+    }
+
+    /**
+     * Returns a stream consisting of the results of replacing each value of this stream with the contents
+     * produced by applying the provided mapping function to each value. This has the effect of applying
+     * a one-to-many transformation to the values of the stream, and then flattening the resulting elements
+     * into a new stream.
+     *
+     * @param function a mapping function to be applied to each value in this stream which produces new values.
+     * @return the new stream
+     */
+    public <R> Stream<R> flatMap(FlatMapFunction<T, R> function) {
+        return new Stream<>(streamBuilder, addProcessorNode(new FlatMapProcessor<>(function), VALUE));
+    }
+
+    /**
+     * Returns a stream consisting of the results of replacing each value of this stream with the key-value pairs
+     * produced by applying the provided mapping function to each value.
+     *
+     * @param function the mapping function to be applied to each value in this stream which produces new key-value pairs.
+     * @param <K>      the key type
+     * @param <V>      the value type
+     * @return the new stream of key-value pairs
+     * @see #flatMap(FlatMapFunction)
+     * @see #mapToPair(PairFunction)
+     */
+    public <K, V> PairStream<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> function) {
+        return new PairStream<>(streamBuilder, addProcessorNode(new FlatMapProcessor<>(function), KEY_VALUE));
+    }
+
+    /**
+     * Returns a new stream consisting of the elements that fall within the window as specified by the window parameter.
+     * The {@link Window} specification could be used to specify sliding or tumbling windows based on
+     * time duration or event count. For example,
+     * <pre>
+     * // time duration based sliding window
+     * stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1));
+     *
+     * // count based sliding window
+     * stream.window(SlidingWindows.of(Count.(10), Count.of(2)));
+     *
+     * // time duration based tumbling window
+     * stream.window(TumblingWindows.of(Duration.seconds(10));
+     * </p>
+     *
+     * @see org.apache.storm.streams.windowing.SlidingWindows
+     * @see org.apache.storm.streams.windowing.TumblingWindows
+     * @param window the window configuration
+     * @return the new stream
+     */
+    public Stream<T> window(Window<?, ?> window) {
+        return new Stream<>(streamBuilder, addNode(new WindowNode(window, stream, node.getOutputFields())));
+    }
+
+    /**
+     * Performs an action for each element of this stream.
+     *
+     * @param action an action to perform on the elements
+     */
+    public void forEach(Consumer<? super T> action) {
+        addProcessorNode(new ForEachProcessor<>(action), new Fields());
+    }
+
+    /**
+     * Returns a stream consisting of the elements of this stream, additionally performing the provided action on
+     * each element as they are consumed from the resulting stream.
+     *
+     * @param action the action to perform on the element as they are consumed from the stream
+     * @return the new stream
+     */
+    public Stream<T> peek(Consumer<? super T> action) {
+        return new Stream<>(streamBuilder, addProcessorNode(new PeekProcessor<>(action), node.getOutputFields()));
+    }
+
+    /**
+     * Aggregates the values in this stream using the aggregator. This does a global aggregation, i.e. the elements
+     * across all the partitions are forwarded to a single task for computing the aggregate.
+     * <p>
+     * If the stream is windowed, the aggregate result is emitted after each window activation and represents the
+     * aggregate of elements that fall within that window.
+     * If the stream is not windowed, the aggregate result is emitted as each new element in the stream is processed.
+     * </p>
+     *
+     * @param aggregator the aggregator
+     * @param <R>        the result type
+     * @return the new stream
+     */
+    public <R> Stream<R> aggregate(Aggregator<? super T, ? extends R> aggregator) {
+        return new Stream<>(streamBuilder, global().addProcessorNode(new AggregateProcessor<>(aggregator), VALUE));
+    }
+
+    /**
+     * Performs a reduction on the elements of this stream, by repeatedly applying the reducer.
+     * <p>
+     * If the stream is windowed, the result is emitted after each window activation and represents the
+     * reduction of elements that fall within that window.
+     * If the stream is not windowed, the result is emitted as each new element in the stream is processed.
+     * </p>
+     *
+     * @param reducer the reducer
+     * @return the new stream
+     */
+    public Stream<T> reduce(Reducer<T> reducer) {
+        return new Stream<>(streamBuilder, global().addProcessorNode(new ReduceProcessor<>(reducer), VALUE));
+    }
+
+    /**
+     * Returns a new stream with the given value of parallelism. Further operations on this stream
+     * would execute at this level of parallelism.
+     *
+     * @param parallelism the parallelism value
+     * @return the new stream
+     */
+    public Stream<T> repartition(int parallelism) {
+        if (parallelism < 1) {
+            throw new IllegalArgumentException("Parallelism should be >= 1");
+        }
+        Node partitionNode = addNode(node, new PartitionNode(stream, node.getOutputFields()), parallelism);
+        return new Stream<>(streamBuilder, partitionNode);
+    }
+
+    /**
+     * Returns an array of streams by splitting the given stream into multiple branches based on the given
+     * predicates. The predicates are applied in the given order to the values of this stream and the result
+     * is forwarded to the corresponding (index based) result stream based on the (index of) predicate that matches.
+     * If none of the predicates match a value, that value is dropped.
+     *
+     * @param predicates the predicates
+     * @return an array of result streams (branches) corresponding to the given predicates
+     */
+    @SuppressWarnings("unchecked")
+    public Stream<T>[] branch(Predicate<T>... predicates) {
+        List<Stream<T>> childStreams = new ArrayList<>();
+        if (predicates.length > 0) {
+            BranchProcessor<T> branchProcessor = new BranchProcessor<>();
+            Node branchNode = addProcessorNode(branchProcessor, VALUE);
+            for (Predicate<T> predicate : predicates) {
+                // create a child node (identity) per branch
+                ProcessorNode child = makeProcessorNode(new MapProcessor<>(new IdentityFunction<>()), node.getOutputFields());
+                String branchStream = child.getOutputStreams().iterator().next() + "-branch";
+                // branchStream is the parent stream that connects branch processor to this child
+                branchNode.addOutputStream(branchStream);
+                addNode(branchNode, child, branchStream);
+                childStreams.add(new Stream<>(streamBuilder, child));
+                branchProcessor.addPredicate(predicate, branchStream);
+            }
+        }
+        return childStreams.toArray((Stream<T>[]) new Stream[childStreams.size()]);
+    }
+
+    /**
+     * Print the values in this stream.
+     */
+    public void print() {
+        forEach(new PrintConsumer<T>());
+    }
+
+    /**
+     * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+     * sinks in the stream, for e.g. a {@code RedisStoreBolt}. The bolt would have a parallelism of 1.
+     * <p>
+     * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+     * </p>
+     *
+     * @param bolt the bolt
+     */
+    public void to(IRichBolt bolt) {
+        to(bolt, 1);
+    }
+
+    /**
+     * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+     * sinks in the stream, for e.g. a {@code RedisStoreBolt}.
+     * <p>
+     * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+     * </p>
+     *
+     * @param bolt        the bolt
+     * @param parallelism the parallelism of the bolt
+     */
+    public void to(IRichBolt bolt, int parallelism) {
+        addSinkNode(new SinkNode(bolt), parallelism);
+    }
+
+    /**
+     * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+     * sinks in the stream, for e.g. a {@code RedisStoreBolt}. The bolt would have a parallelism of 1.
+     * <p>
+     * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+     * </p>
+     *
+     * @param bolt the bolt
+     */
+    public void to(IBasicBolt bolt) {
+        to(bolt, 1);
+    }
+
+    /**
+     * Sends the elements of this stream to a bolt. This could be used to plug in existing bolts as
+     * sinks in the stream, for e.g. a {@code RedisStoreBolt}.
+     * <p>
+     * <b>Note:</b> This would provide guarantees only based on what the bolt provides.
+     * </p>
+     *
+     * @param bolt        the bolt
+     * @param parallelism the parallelism of the bolt
+     */
+    public void to(IBasicBolt bolt, int parallelism) {
+        addSinkNode(new SinkNode(bolt), parallelism);
+    }
+
+    /**
+     * Queries the given stream state with the values in this stream as the keys.
+     *
+     * @param streamState the stream state
+     * @param <V>         the value type
+     * @return the result stream
+     */
+    public <V> PairStream<T, V> stateQuery(StreamState<T, V> streamState) {
+        // need all grouping for state query since the state is local
+        Node node = all().addProcessorNode(new StateQueryProcessor<>(streamState), KEY_VALUE);
+        // add 'updateState' node as parent so that state query gets processed after update state
+        addNode(streamState.getUpdateStateNode(), node, node.getParallelism());
+        return new PairStream<>(streamBuilder, node);
+    }
+
+    Node getNode() {
+        return node;
+    }
+
+    Node addNode(Node parent, Node child, int parallelism) {
+        return streamBuilder.addNode(parent, child, parallelism);
+    }
+
+    Node addNode(Node child) {
+        return addNode(this.node, child);
+    }
+
+    Node addProcessorNode(Processor<?> processor, Fields outputFields) {
+        return addNode(makeProcessorNode(processor, outputFields));
+    }
+
+    String getStream() {
+        return stream;
+    }
+
+    private Node addNode(Node parent, Node child) {
+        return streamBuilder.addNode(parent, child);
+    }
+
+    private Node addNode(Node parent, Node child, String parentStreamId) {
+        return streamBuilder.addNode(parent, child, parentStreamId);
+    }
+
+    private Node addNode(Node child, int parallelism, String parentStreamId) {
+        return streamBuilder.addNode(this.node, child, parallelism, parentStreamId);
+    }
+
+    private ProcessorNode makeProcessorNode(Processor<?> processor, Fields outputFields) {
+        return new ProcessorNode(processor, UniqueIdGen.getInstance().getUniqueStreamId(), outputFields);
+    }
+
+    private void addSinkNode(SinkNode sinkNode, int parallelism) {
+        String boltId = UniqueIdGen.getInstance().getUniqueBoltId();
+        sinkNode.setComponentId(boltId);
+        sinkNode.setParallelism(parallelism);
+        if (node instanceof SpoutNode) {
+            addNode(sinkNode, parallelism, Utils.DEFAULT_STREAM_ID);
+        } else {
+            /*
+              * add a stream__sink stream to the current node (parent) for emitting
+              * just the values (no punctuation) to the bolt.
+              */
+            String sinkStream = StreamUtil.getSinkStream(stream);
+            node.addOutputStream(sinkStream);
+            addNode(sinkNode, parallelism, sinkStream);
+        }
+    }
+
+    private Stream<T> global() {
+        Node partitionNode = addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.global()));
+        return new Stream<>(streamBuilder, partitionNode);
+    }
+
+    private Stream<T> all() {
+        Node partitionNode = addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.all()));
+        return new Stream<>(streamBuilder, partitionNode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java b/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java
new file mode 100644
index 0000000..6170ea8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBolt.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+/**
+ * Interface for bolts that executes the streaming operations via the
+ * processors.
+ */
+interface StreamBolt {
+    /**
+     * The bolt component id. This is auto generated value like (bolt-N).
+     *
+     * @return the bolt id.
+     */
+    String getId();
+
+    /**
+     * Set the timestamp field name for event time based processing.
+     *
+     * @param fieldName the timestamp field name
+     */
+    void setTimestampField(String fieldName);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
new file mode 100644
index 0000000..e19a0c6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.streams.operations.IdentityFunction;
+import org.apache.storm.streams.operations.mappers.PairValueMapper;
+import org.apache.storm.streams.operations.mappers.TupleValueMapper;
+import org.apache.storm.streams.processors.JoinProcessor;
+import org.apache.storm.streams.processors.MapProcessor;
+import org.apache.storm.streams.processors.StateQueryProcessor;
+import org.apache.storm.streams.processors.StatefulProcessor;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.jgrapht.traverse.TopologicalOrderIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * A builder for constructing a {@link StormTopology} via storm streams api (DSL)
+ */
+public class StreamBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamBuilder.class);
+    private final DefaultDirectedGraph<Node, Edge> graph;
+    private final Table<Node, String, GroupingInfo> nodeGroupingInfo = HashBasedTable.create();
+    private final Map<Node, WindowNode> windowInfo = new HashMap<>();
+    private final List<ProcessorNode> curGroup = new ArrayList<>();
+    private int statefulProcessorCount = 0;
+    private final Map<StreamBolt, BoltDeclarer> streamBolts = new HashMap<>();
+    private String timestampFieldName = null;
+
+    /**
+     * Creates a new {@link StreamBuilder}
+     */
+    public StreamBuilder() {
+        graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory());
+    }
+
+    /**
+     * Creates a new {@link Stream} of tuples from the given {@link IRichSpout}
+     *
+     * @param spout the spout
+     * @return the new stream
+     */
+    public Stream<Tuple> newStream(IRichSpout spout) {
+        return newStream(spout, 1);
+    }
+
+    /**
+     * Creates a new {@link Stream} of tuples from the given {@link IRichSpout} with the given
+     * parallelism.
+     *
+     * @param spout       the spout
+     * @param parallelism the parallelism of the stream
+     * @return the new stream
+     */
+    public Stream<Tuple> newStream(IRichSpout spout, int parallelism) {
+        SpoutNode spoutNode = new SpoutNode(spout);
+        String spoutId = UniqueIdGen.getInstance().getUniqueSpoutId();
+        spoutNode.setComponentId(spoutId);
+        spoutNode.setParallelism(parallelism);
+        graph.addVertex(spoutNode);
+        return new Stream<>(this, spoutNode);
+    }
+
+    /**
+     * Creates a new {@link Stream} of values from the given {@link IRichSpout} by extracting field(s)
+     * from tuples via the supplied {@link TupleValueMapper}.
+     *
+     * @param spout       the spout
+     * @param valueMapper the value mapper
+     * @param <T>         the type of values in the resultant stream
+     * @return the new stream
+     */
+    public <T> Stream<T> newStream(IRichSpout spout, TupleValueMapper<T> valueMapper) {
+        return newStream(spout).map(valueMapper);
+    }
+
+
+    /**
+     * Creates a new {@link Stream} of values from the given {@link IRichSpout} by extracting field(s)
+     * from tuples via the supplied {@link TupleValueMapper} with the given parallelism.
+     *
+     * @param spout       the spout
+     * @param valueMapper the value mapper
+     * @param parallelism the parallelism of the stream
+     * @param <T>         the type of values in the resultant stream
+     * @return the new stream
+     */
+    public <T> Stream<T> newStream(IRichSpout spout, TupleValueMapper<T> valueMapper, int parallelism) {
+        return newStream(spout, parallelism).map(valueMapper);
+    }
+
+    /**
+     * Creates a new {@link PairStream} of key-value pairs from the given {@link IRichSpout} by extracting key and
+     * value from tuples via the supplied {@link PairValueMapper}.
+     *
+     * @param spout           the spout
+     * @param pairValueMapper the pair value mapper
+     * @param <K>             the key type
+     * @param <V>             the value type
+     * @return the new stream of key-value pairs
+     */
+    public <K, V> PairStream<K, V> newStream(IRichSpout spout, PairValueMapper<K, V> pairValueMapper) {
+        return newStream(spout).mapToPair(pairValueMapper);
+    }
+
+    /**
+     * Creates a new {@link PairStream} of key-value pairs from the given {@link IRichSpout} by extracting key and
+     * value from tuples via the supplied {@link PairValueMapper} and with the given value of parallelism.
+     *
+     * @param spout           the spout
+     * @param pairValueMapper the pair value mapper
+     * @param parallelism     the parallelism of the stream
+     * @param <K>             the key type
+     * @param <V>             the value type
+     * @return the new stream of key-value pairs
+     */
+    public <K, V> PairStream<K, V> newStream(IRichSpout spout, PairValueMapper<K, V> pairValueMapper, int parallelism) {
+        return newStream(spout, parallelism).mapToPair(pairValueMapper);
+    }
+
+
+    /**
+     * Builds a new {@link StormTopology} for the computation expressed
+     * via the stream api.
+     *
+     * @return the storm topology
+     */
+    public StormTopology build() {
+        nodeGroupingInfo.clear();
+        windowInfo.clear();
+        curGroup.clear();
+        TopologicalOrderIterator<Node, Edge> iterator = new TopologicalOrderIterator<>(graph, queue());
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        while (iterator.hasNext()) {
+            Node node = iterator.next();
+            if (node instanceof SpoutNode) {
+                addSpout(topologyBuilder, (SpoutNode) node);
+            } else if (node instanceof ProcessorNode) {
+                handleProcessorNode((ProcessorNode) node, topologyBuilder);
+            } else if (node instanceof PartitionNode) {
+                updateNodeGroupingInfo((PartitionNode) node);
+                processCurGroup(topologyBuilder);
+            } else if (node instanceof WindowNode) {
+                updateWindowInfo((WindowNode) node);
+                processCurGroup(topologyBuilder);
+            } else if (node instanceof SinkNode) {
+                processCurGroup(topologyBuilder);
+                addSink(topologyBuilder, (SinkNode) node);
+            }
+        }
+        processCurGroup(topologyBuilder);
+        mayBeAddTsField();
+        return topologyBuilder.createTopology();
+    }
+
+    Node addNode(Node parent, Node child) {
+        return addNode(parent, child, parent.getParallelism(), parent.getOutputStreams().iterator().next());
+    }
+
+    Node addNode(Node parent, Node child, int parallelism) {
+        return addNode(parent, child, parallelism, parent.getOutputStreams().iterator().next());
+    }
+
+    Node addNode(Node parent, Node child, String parentStreamId) {
+        return addNode(parent, child, parent.getParallelism(), parentStreamId);
+    }
+
+    Node addNode(Node parent, Node child, int parallelism, String parentStreamId) {
+        graph.addVertex(child);
+        graph.addEdge(parent, child);
+        child.setParallelism(parallelism);
+        if (parent instanceof WindowNode || parent instanceof PartitionNode) {
+            child.addParentStream(parentNode(parent), parentStreamId);
+        } else {
+            child.addParentStream(parent, parentStreamId);
+        }
+        return child;
+    }
+
+    private PriorityQueue<Node> queue() {
+        // min-heap
+        return new PriorityQueue<>(new Comparator<Node>() {
+            @Override
+            public int compare(Node n1, Node n2) {
+                return getPriority(n1.getClass()) - getPriority(n2.getClass());
+            }
+
+            private int getPriority(Class<? extends Node> clazz) {
+                /*
+                 * Nodes in the descending order of priority.
+                 * ProcessorNode has the highest priority so that the topological order iterator
+                 * will group as many processor nodes together as possible.
+                 */
+                Class<?>[] p = new Class<?>[]{
+                        ProcessorNode.class,
+                        SpoutNode.class,
+                        SinkNode.class,
+                        PartitionNode.class,
+                        WindowNode.class};
+                for (int i = 0; i < p.length; i++) {
+                    if (clazz.equals(p[i])) {
+                        return i;
+                    }
+                }
+                return Integer.MAX_VALUE;
+            }
+        });
+    }
+
+    private void handleProcessorNode(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
+        if (processorNode.getProcessor() instanceof StatefulProcessor) {
+            statefulProcessorCount++;
+            Set<ProcessorNode> initialNodes = initialProcessors(
+                    curGroup.isEmpty() ? Collections.singletonList(processorNode) : curGroup);
+            Set<Window<?, ?>> windows = getWindowParams(initialNodes);
+            // if we get more than one stateful operation, we need to process the
+            // current group so that we have one stateful operation per stateful bolt
+            if (statefulProcessorCount > 1 || !windows.isEmpty()) {
+                if (!curGroup.isEmpty()) {
+                    processCurGroup(topologyBuilder);
+                } else if (!windows.isEmpty()) {
+                    // a stateful processor immediately follows a window specification
+                    splitStatefulProcessor(processorNode, topologyBuilder);
+                }
+                statefulProcessorCount = 1;
+            }
+        }
+        curGroup.add(processorNode);
+    }
+
+    /*
+     * force create a windowed bolt with identity nodes so that we don't
+     * have a stateful processor inside a windowed bolt.
+     */
+    private void splitStatefulProcessor(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
+        for (Node parent : StreamUtil.<Node>getParents(graph, processorNode)) {
+            ProcessorNode identity =
+                    new ProcessorNode(new MapProcessor<>(new IdentityFunction<>()),
+                            UniqueIdGen.getInstance().getUniqueStreamId(),
+                            parent.getOutputFields());
+            addNode(parent, identity);
+            graph.removeEdge(parent, processorNode);
+            processorNode.removeParentStreams(parent);
+            addNode(identity, processorNode);
+            curGroup.add(identity);
+        }
+        processCurGroup(topologyBuilder);
+    }
+
+    private void mayBeAddTsField() {
+        if (timestampFieldName != null) {
+            for (StreamBolt streamBolt : streamBolts.keySet()) {
+                streamBolt.setTimestampField(timestampFieldName);
+            }
+        }
+    }
+
+    private void updateNodeGroupingInfo(PartitionNode partitionNode) {
+        if (partitionNode.getGroupingInfo() != null) {
+            for (Node parent : parentNodes(partitionNode)) {
+                for (String parentStream : partitionNode.getParentStreams(parent)) {
+                    nodeGroupingInfo.put(parent, parentStream, partitionNode.getGroupingInfo());
+                }
+            }
+        }
+    }
+
+    private void updateWindowInfo(WindowNode windowNode) {
+        for (Node parent : parentNodes(windowNode)) {
+            windowInfo.put(parent, windowNode);
+        }
+        String tsField = windowNode.getWindowParams().getTimestampField();
+        if (tsField != null) {
+            if (timestampFieldName != null && !tsField.equals(timestampFieldName)) {
+                throw new IllegalArgumentException("Cannot set different timestamp field names");
+            }
+            timestampFieldName = tsField;
+        }
+    }
+
+    private Node parentNode(Node curNode) {
+        Set<Node> parentNode = parentNodes(curNode);
+        if (parentNode.size() > 1) {
+            throw new IllegalArgumentException("Node " + curNode + " has more than one parent node.");
+        }
+        if (parentNode.isEmpty()) {
+            throw new IllegalArgumentException("Node " + curNode + " has no parent.");
+        }
+        return parentNode.iterator().next();
+    }
+
+    private Set<Node> parentNodes(Node curNode) {
+        Set<Node> nodes = new HashSet<>();
+        for (Node parent : StreamUtil.<Node>getParents(graph, curNode)) {
+            if (parent instanceof ProcessorNode || parent instanceof SpoutNode) {
+                nodes.add(parent);
+            } else {
+                nodes.addAll(parentNodes(parent));
+            }
+        }
+        return nodes;
+    }
+
+    private void processCurGroup(TopologyBuilder topologyBuilder) {
+        if (curGroup.isEmpty()) {
+            return;
+        }
+
+        String boltId = UniqueIdGen.getInstance().getUniqueBoltId();
+        for (ProcessorNode processorNode : curGroup) {
+            processorNode.setComponentId(boltId);
+            processorNode.setWindowed(isWindowed(processorNode));
+            processorNode.setWindowedParentStreams(getWindowedParentStreams(processorNode));
+        }
+        final Set<ProcessorNode> initialProcessors = initialProcessors(curGroup);
+        Set<Window<?, ?>> windowParams = getWindowParams(initialProcessors);
+        if (windowParams.isEmpty()) {
+            if (hasStatefulProcessor(curGroup)) {
+                addStatefulBolt(topologyBuilder, boltId, initialProcessors);
+            } else {
+                addBolt(topologyBuilder, boltId, initialProcessors);
+            }
+        } else if (windowParams.size() == 1) {
+            addWindowedBolt(topologyBuilder, boltId, initialProcessors, windowParams.iterator().next());
+        } else {
+            throw new IllegalStateException("More than one window config for current group " + curGroup);
+        }
+        curGroup.clear();
+    }
+
+    private boolean hasStatefulProcessor(List<ProcessorNode> processorNodes) {
+        for (ProcessorNode node : processorNodes) {
+            if (node.getProcessor() instanceof StatefulProcessor) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private int getParallelism() {
+        Set<Integer> parallelisms = new HashSet<>(Collections2.transform(curGroup, new Function<ProcessorNode, Integer>() {
+            @Override
+            public Integer apply(ProcessorNode input) {
+                return input.getParallelism();
+            }
+        }));
+
+        if (parallelisms.size() > 1) {
+            throw new IllegalStateException("Current group does not have same parallelism " + curGroup);
+        }
+
+        return parallelisms.isEmpty() ? 1 : parallelisms.iterator().next();
+    }
+
+    private Set<Window<?, ?>> getWindowParams(Set<ProcessorNode> initialProcessors) {
+        Set<WindowNode> windowNodes = new HashSet<>();
+        Set<Node> parents;
+        for (ProcessorNode processorNode : initialProcessors) {
+            if (processorNode.getProcessor() instanceof JoinProcessor) {
+                String leftStream = ((JoinProcessor) processorNode.getProcessor()).getLeftStream();
+                parents = processorNode.getParents(leftStream);
+            } else {
+                parents = parentNodes(processorNode);
+            }
+            for (Node node : parents) {
+                if (windowInfo.containsKey(node)) {
+                    windowNodes.add(windowInfo.get(node));
+                }
+            }
+        }
+
+        Set<Window<?, ?>> windowParams = new HashSet<>();
+        if (!windowNodes.isEmpty()) {
+            windowParams.addAll(new HashSet<>(Collections2.transform(windowNodes, new Function<WindowNode, Window<?, ?>>() {
+                @Override
+                public Window<?, ?> apply(WindowNode input) {
+                    return input.getWindowParams();
+                }
+            })));
+        }
+        return windowParams;
+    }
+
+    private void addSpout(TopologyBuilder topologyBuilder, SpoutNode spout) {
+        topologyBuilder.setSpout(spout.getComponentId(), spout.getSpout(), spout.getParallelism());
+    }
+
+    private void addSink(TopologyBuilder topologyBuilder, SinkNode sinkNode) {
+        IComponent bolt = sinkNode.getBolt();
+        BoltDeclarer boltDeclarer;
+        if (bolt instanceof IRichBolt) {
+            boltDeclarer = topologyBuilder.setBolt(sinkNode.getComponentId(), (IRichBolt) bolt, sinkNode.getParallelism());
+        } else if (bolt instanceof IBasicBolt) {
+            boltDeclarer = topologyBuilder.setBolt(sinkNode.getComponentId(), (IBasicBolt) bolt, sinkNode.getParallelism());
+        } else {
+            throw new IllegalArgumentException("Expect IRichBolt or IBasicBolt in addBolt");
+        }
+        for (Node parent : parentNodes(sinkNode)) {
+            for (String stream : sinkNode.getParentStreams(parent)) {
+                declareStream(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
+            }
+        }
+    }
+
+    private StreamBolt addBolt(TopologyBuilder topologyBuilder,
+                               String boltId,
+                               Set<ProcessorNode> initialProcessors) {
+        ProcessorBolt bolt = new ProcessorBolt(boltId, graph, curGroup);
+        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism());
+        bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors));
+        streamBolts.put(bolt, boltDeclarer);
+        return bolt;
+    }
+
+    private StreamBolt addStatefulBolt(TopologyBuilder topologyBuilder,
+                                       String boltId,
+                                       Set<ProcessorNode> initialProcessors) {
+        StateQueryProcessor<?, ?> stateQueryProcessor = getStateQueryProcessor();
+        StatefulProcessorBolt<?, ?> bolt;
+        if (stateQueryProcessor == null) {
+            bolt = new StatefulProcessorBolt<>(boltId, graph, curGroup);
+            BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism());
+            bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors));
+            streamBolts.put(bolt, boltDeclarer);
+        } else {
+            // state query is added to the existing stateful bolt
+            ProcessorNode updateStateNode = stateQueryProcessor.getStreamState().getUpdateStateNode();
+            bolt = findStatefulProcessorBolt(updateStateNode);
+            for (ProcessorNode node : curGroup) {
+                node.setComponentId(bolt.getId());
+            }
+            bolt.addNodes(curGroup);
+            bolt.addStreamToInitialProcessors(wireBolt(bolt.getNodes(), streamBolts.get(bolt), initialProcessors));
+        }
+        return bolt;
+    }
+
+    private StateQueryProcessor<?, ?> getStateQueryProcessor() {
+        for (ProcessorNode node : curGroup) {
+            if (node.getProcessor() instanceof StateQueryProcessor) {
+                return (StateQueryProcessor<?, ?>) node.getProcessor();
+            }
+        }
+        return null;
+    }
+
+    private StreamBolt addWindowedBolt(TopologyBuilder topologyBuilder,
+                                       String boltId,
+                                       Set<ProcessorNode> initialProcessors,
+                                       Window<?, ?> windowParam) {
+        WindowedProcessorBolt bolt = new WindowedProcessorBolt(boltId, graph, curGroup, windowParam);
+        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(boltId, bolt, getParallelism());
+        bolt.setStreamToInitialProcessors(wireBolt(curGroup, boltDeclarer, initialProcessors));
+        streamBolts.put(bolt, boltDeclarer);
+        return bolt;
+    }
+
+    private StatefulProcessorBolt<?, ?> findStatefulProcessorBolt(ProcessorNode updateStateNode) {
+        for (StreamBolt bolt : streamBolts.keySet()) {
+            if (bolt instanceof StatefulProcessorBolt) {
+                StatefulProcessorBolt<?, ?> statefulProcessorBolt = (StatefulProcessorBolt) bolt;
+                if (statefulProcessorBolt.getNodes().contains(updateStateNode)) {
+                    return statefulProcessorBolt;
+                }
+            }
+        }
+        throw new IllegalArgumentException("Could not find Stateful bolt for node " + updateStateNode);
+    }
+
+    private Set<String> getWindowedParentStreams(ProcessorNode processorNode) {
+        Set<String> res = new HashSet<>();
+        for (Node parent : parentNodes(processorNode)) {
+            if (parent instanceof ProcessorNode) {
+                ProcessorNode pn = (ProcessorNode) parent;
+                if (pn.isWindowed()) {
+                    res.addAll(Collections2.filter(pn.getOutputStreams(), new Predicate<String>() {
+                        @Override
+                        public boolean apply(String input) {
+                            return !StreamUtil.isSinkStream(input);
+                        }
+                    }));
+                }
+            }
+        }
+        return res;
+    }
+
+    private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> curGroup,
+                                                     BoltDeclarer boltDeclarer,
+                                                     Set<ProcessorNode> initialProcessors) {
+        LOG.debug("Wiring bolt with boltDeclarer {}, curGroup {}, initialProcessors {}, nodeGroupingInfo {}",
+                boltDeclarer, curGroup, initialProcessors, nodeGroupingInfo);
+        Multimap<String, ProcessorNode> streamToInitialProcessor = ArrayListMultimap.create();
+        Set<ProcessorNode> curSet = new HashSet<>(curGroup);
+        for (ProcessorNode curNode : initialProcessors) {
+            for (Node parent : parentNodes(curNode)) {
+                if (curSet.contains(parent)) {
+                    LOG.debug("Parent {} of curNode {} is in curGroup {}", parent, curNode, curGroup);
+                } else {
+                    for (String stream : curNode.getParentStreams(parent)) {
+                        declareStream(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
+                        // put global stream id for spouts
+                        if (parent.getComponentId().startsWith("spout")) {
+                            stream = parent.getComponentId() + stream;
+                        }
+                        streamToInitialProcessor.put(stream, curNode);
+                    }
+                }
+            }
+        }
+        return streamToInitialProcessor;
+    }
+
+    private void declareStream(BoltDeclarer boltDeclarer, Node parent, String streamId, GroupingInfo grouping) {
+        if (grouping == null) {
+            boltDeclarer.shuffleGrouping(parent.getComponentId(), streamId);
+        } else {
+            grouping.declareGrouping(boltDeclarer, parent.getComponentId(), streamId, grouping.getFields());
+        }
+    }
+
+    private Set<ProcessorNode> initialProcessors(List<ProcessorNode> curGroup) {
+        Set<ProcessorNode> nodes = new HashSet<>();
+        Set<ProcessorNode> curSet = new HashSet<>(curGroup);
+        for (ProcessorNode node : curGroup) {
+            for (Node parent : parentNodes(node)) {
+                if (!(parent instanceof ProcessorNode) || !curSet.contains(parent)) {
+                    nodes.add(node);
+                }
+            }
+        }
+        return nodes;
+    }
+
+    private boolean isWindowed(Node curNode) {
+        for (Node parent : StreamUtil.<Node>getParents(graph, curNode)) {
+            if (parent instanceof WindowNode) {
+                return true;
+            } else if (parent instanceof ProcessorNode) {
+                ProcessorNode p = (ProcessorNode) parent;
+                if (p.isWindowed()) {
+                    return true;
+                }
+            } else {
+                return (parent instanceof PartitionNode) && isWindowed(parent);
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamState.java b/storm-core/src/jvm/org/apache/storm/streams/StreamState.java
new file mode 100644
index 0000000..a4633f7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamState.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import java.io.Serializable;
+
+/**
+ * A wrapper for the stream state which can be used to
+ * query the state via {@link Stream#stateQuery(StreamState)}
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class StreamState<K, V> implements Serializable {
+    private final transient PairStream<K, V> stream;
+
+    StreamState(PairStream<K, V> stream) {
+        this.stream = stream;
+    }
+
+    public PairStream<K, V> toPairStream() {
+        return stream;
+    }
+
+    ProcessorNode getUpdateStateNode() {
+        return (ProcessorNode) stream.node;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java b/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
new file mode 100644
index 0000000..0531ff6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.jgrapht.DirectedGraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamUtil {
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node) {
+        List<Edge> incoming = new ArrayList<>(graph.incomingEdgesOf(node));
+        List<T> ret = new ArrayList<>();
+        for (Edge e : incoming) {
+            ret.add((T) e.getSource());
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> List<T> getChildren(DirectedGraph<Node, Edge> graph, Node node) {
+        List<Edge> outgoing = new ArrayList<>(graph.outgoingEdgesOf(node));
+        List<T> ret = new ArrayList<>();
+        for (Edge e : outgoing) {
+            ret.add((T) e.getTarget());
+        }
+        return ret;
+    }
+
+
+    public static boolean isSinkStream(String streamId) {
+        return streamId.endsWith("__sink");
+    }
+
+    public static String getSinkStream(String streamId) {
+        return streamId + "__sink";
+    }
+
+    public static boolean isPunctuation(Object value) {
+        if (value instanceof Pair) {
+            value = ((Pair) value).getFirst();
+        }
+        return WindowNode.PUNCTUATION.equals(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java b/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java
new file mode 100644
index 0000000..0078690
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.jgrapht.EdgeFactory;
+
+import java.io.Serializable;
+
+class StreamsEdgeFactory implements EdgeFactory<Node, Edge>, Serializable {
+    @Override
+    public Edge createEdge(Node sourceVertex, Node targetVertex) {
+        return new Edge(sourceVertex, targetVertex);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java b/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
new file mode 100644
index 0000000..77973f2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+/**
+ * A tuple of three elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ */
+public class Tuple3<T1, T2, T3> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+
+    /**
+     * Constructs a new tuple of three elements.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     */
+    public Tuple3(T1 _1, T2 _2, T3 _3) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java
new file mode 100644
index 0000000..3cbd141
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/UniqueIdGen.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+class UniqueIdGen {
+    private int streamCounter = 0;
+    private int spoutCounter = 0;
+    private int boltCounter = 0;
+    private static final UniqueIdGen instance = new UniqueIdGen();
+
+    private UniqueIdGen() {
+    }
+
+    static UniqueIdGen getInstance() {
+        return instance;
+    }
+
+    String getUniqueStreamId() {
+        streamCounter++;
+        return "s" + streamCounter;
+    }
+
+    String getUniqueBoltId() {
+        boltCounter++;
+        return "bolt" + boltCounter;
+    }
+
+    String getUniqueSpoutId() {
+        spoutCounter++;
+        return "spout" + spoutCounter;
+    }
+
+    // for unit tests
+    void reset() {
+        streamCounter = 0;
+        spoutCounter = 0;
+        boltCounter = 0;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java b/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
new file mode 100644
index 0000000..a0e831a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Node that captures the windowing configurations.
+ */
+public class WindowNode extends Node {
+    private final Window<?, ?> windowParams;
+    public static final String PUNCTUATION = "__punctuation";
+
+    WindowNode(Window<?, ?> windowParams, String outputStream, Fields outputFields) {
+        super(outputStream, outputFields);
+        this.windowParams = windowParams;
+    }
+
+    Window<?, ?> getWindowParams() {
+        return windowParams;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java b/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
new file mode 100644
index 0000000..3971346
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.windowing.SlidingWindows;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TupleWindow;
+import org.jgrapht.DirectedGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.streams.WindowNode.PUNCTUATION;
+
+/**
+ * Stream bolt that executes windowing operations.
+ */
+class WindowedProcessorBolt extends BaseWindowedBolt implements StreamBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowedProcessorBolt.class);
+    private final ProcessorBoltDelegate delegate;
+    private final Window<?, ?> window;
+
+    WindowedProcessorBolt(String id, DirectedGraph<Node, Edge> graph,
+                          List<ProcessorNode> nodes,
+                          Window<?, ?> window) {
+        delegate = new ProcessorBoltDelegate(id, graph, nodes);
+        this.window = window;
+        setWindowConfig();
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        delegate.prepare(stormConf, context, collector);
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+        LOG.trace("Window triggered at {}, inputWindow {}", new Date(), inputWindow);
+        if (delegate.isEventTimestamp()) {
+            delegate.setEventTimestamp(inputWindow.getTimestamp());
+        }
+        for (Tuple tuple : inputWindow.get()) {
+            Pair<Object, String> valueAndStream = delegate.getValueAndStream(tuple);
+            if (!StreamUtil.isPunctuation(valueAndStream.getFirst())) {
+                delegate.process(valueAndStream.getFirst(), valueAndStream.getSecond());
+            }
+        }
+        for (String stream : delegate.getInitialStreams()) {
+            delegate.process(PUNCTUATION, stream);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        delegate.declareOutputFields(declarer);
+    }
+
+    @Override
+    public void setTimestampField(String fieldName) {
+        delegate.setTimestampField(fieldName);
+    }
+
+    @Override
+    public String getId() {
+        return delegate.getId();
+    }
+
+    private void setWindowConfig() {
+        if (window instanceof SlidingWindows) {
+            setSlidingWindowParams(window.getWindowLength(), window.getSlidingInterval());
+        } else if (window instanceof TumblingWindows) {
+            setTumblingWindowParams(window.getWindowLength());
+        }
+        if (window.getTimestampField() != null) {
+            withTimestampField(window.getTimestampField());
+        }
+        if (window.getLag() != null) {
+            withLag(window.getLag());
+        }
+        if (window.getLateTupleStream() != null) {
+            withLateTupleStream(window.getLateTupleStream());
+        }
+    }
+
+    private void setSlidingWindowParams(Object windowLength, Object slidingInterval) {
+        if (windowLength instanceof Count) {
+            if (slidingInterval instanceof Count) {
+                withWindow((Count) windowLength, (Count) slidingInterval);
+            } else if (slidingInterval instanceof Duration) {
+                withWindow((Count) windowLength, (Duration) slidingInterval);
+            }
+        } else if (windowLength instanceof Duration) {
+            if (slidingInterval instanceof Count) {
+                withWindow((Duration) windowLength, (Count) slidingInterval);
+            } else if (slidingInterval instanceof Duration) {
+                withWindow((Duration) windowLength, (Duration) slidingInterval);
+            }
+        }
+    }
+
+    private void setTumblingWindowParams(Object windowLength) {
+        if (windowLength instanceof Count) {
+            withTumblingWindow((Count) windowLength);
+        } else if (windowLength instanceof Duration) {
+            withTumblingWindow((Duration) windowLength);
+        }
+    }
+
+    void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+        delegate.setStreamToInitialProcessors(streamToInitialProcessors);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
new file mode 100644
index 0000000..e3feaf4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Interface for aggregating values.
+ *
+ * @param <T> the original value type
+ * @param <R> the aggregated value type
+ */
+public interface Aggregator<T, R> extends Operation {
+    /**
+     * The initial value of the aggregate to start with.
+     *
+     * @return the initial value
+     */
+    R init();
+
+    /**
+     * Returns a new aggregate by applying the value with the current aggregate.
+     *
+     * @param value     the value to aggregate
+     * @param aggregate the current aggregate
+     * @return the new aggregate
+     */
+    R apply(T value, R aggregate);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java
new file mode 100644
index 0000000..84653ab
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Consumer.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result.
+ *
+ * @param <T> the type of the input argument
+ */
+public interface Consumer<T> extends Operation {
+    /**
+     * Performs an operation on the given argument.
+     *
+     * @param input the input
+     */
+    void accept(T input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java
new file mode 100644
index 0000000..bcacd08
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/FlatMapFunction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * A function that accepts one argument and returns an {@link Iterable} of elements as its result.
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the result type. An iterable of this type is returned from this function
+ */
+public interface FlatMapFunction<T, R> extends Function<T, Iterable<R>> {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java
new file mode 100644
index 0000000..7cef0a6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Function.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Represents a function that accepts one argument and produces a result.
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ */
+public interface Function<T, R> extends Operation {
+    /**
+     * Applies this function to the given argument.
+     *
+     * @param input the input to the function
+     * @return the function result
+     */
+    R apply(T input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java
new file mode 100644
index 0000000..abb9327
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/IdentityFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * A {@link Function} that returns the input argument itself as the result.
+ *
+ * @param <T> the type of the input
+ */
+public class IdentityFunction<T> implements Function<T, T> {
+
+    @Override
+    public T apply(T input) {
+        return input;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java
new file mode 100644
index 0000000..77dbe1e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Operation.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import java.io.Serializable;
+
+/**
+ * The parent interface for any kind of streaming operation.
+ */
+public interface Operation extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java
new file mode 100644
index 0000000..376c1ba
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFlatMapFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import org.apache.storm.streams.Pair;
+
+/**
+ * A function that accepts one argument and returns an {@link Iterable} of {@link Pair} as its result.
+ *
+ * @param <T> the type of the input to the function
+ * @param <K> the key type of the key-value pairs produced as a result
+ * @param <V> the value type of the key-value pairs produced as a result
+ */
+public interface PairFlatMapFunction<T, K, V> extends FlatMapFunction<T, Pair<K, V>> {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java
new file mode 100644
index 0000000..b153ff1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PairFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import org.apache.storm.streams.Pair;
+
+/**
+ * A function that accepts an argument and produces a key-value Pair.
+ *
+ * @param <T> the type of the input to the function
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface PairFunction<T, K, V> extends Function<T, Pair<K,V>> {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java
new file mode 100644
index 0000000..ca81101
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PairValueJoiner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+import org.apache.storm.streams.Pair;
+
+/**
+ * A {@link ValueJoiner} that joins two values to produce a {@link Pair} of the two values as the result.
+ *
+ * @param <V1> the type of the first value
+ * @param <V2> the type of the second value
+ */
+public class PairValueJoiner<V1, V2> implements ValueJoiner<V1, V2, Pair<V1, V2>> {
+    /**
+     * Joins two values and produces a {@link Pair} of the values as the result.
+     *
+     * @param value1 the first value
+     * @param value2 the second value
+     * @return a pair of the first and second value
+     */
+    @Override
+    public Pair<V1, V2> apply(V1 value1, V2 value2) {
+        return Pair.of(value1, value2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java
new file mode 100644
index 0000000..ae1be2d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Predicate.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Represents a predicate (boolean-valued function) of a value.
+ *
+ * @param <T> the value type
+ */
+public interface Predicate<T> extends Operation {
+    /**
+     * Evaluates this predicate on the given argument.
+     *
+     * @param input the input argument
+     * @return true if the input matches the predicate, false otherwise
+     */
+    boolean test(T input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java
new file mode 100644
index 0000000..d301dbf
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/PrintConsumer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * A consumer that prints the input to stdout.
+ *
+ * @param <T> the value type
+ */
+public class PrintConsumer<T> implements Consumer<T> {
+    @Override
+    public void accept(T input) {
+        System.out.println(input);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
new file mode 100644
index 0000000..04ee70d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * The {@link Reducer} performs an operation on two values of the same type producing a result of the same
+ * type.
+ *
+ * @param <T> the type of the arguments and the result
+ */
+public interface Reducer<T> extends Operation {
+    /**
+     * Applies this function to the given arguments.
+     *
+     * @param arg1 the first argument
+     * @param arg2 the second argument
+     * @return the result
+     */
+    T apply(T arg1, T arg2);
+}