You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/07 01:28:21 UTC

[08/12] storm git commit: [STORM-1961] A few fixes and refactoring

[STORM-1961] A few fixes and refactoring

1. Added typed tuples
2. Change groupByKey semantics and refactor examples
3. Handle punctuations correctly
4. Added countByKey and count
5. Added left, right and full outer joins
6. Per partition combine for aggregate/reduce


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a10865c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a10865c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a10865c

Branch: refs/heads/master
Commit: 3a10865c628fa0606456826e28ce8838baf60134
Parents: e251573
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Oct 25 01:13:53 2016 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Fri Jan 13 01:20:44 2017 +0530

----------------------------------------------------------------------
 .../storm/starter/streams/AggregateExample.java |  96 ++++++
 .../starter/streams/StateQueryExample.java      |   4 +-
 .../starter/streams/StatefulWordCount.java      |  16 +-
 .../starter/streams/TypedTupleExample.java      |  64 ++++
 .../starter/streams/WindowedWordCount.java      |  18 +-
 .../storm/starter/streams/WordCountToBolt.java  |   7 +-
 .../org/apache/storm/streams/GroupingInfo.java  |  16 +
 .../src/jvm/org/apache/storm/streams/Node.java  |  53 ++-
 .../src/jvm/org/apache/storm/streams/Pair.java  |  19 +
 .../org/apache/storm/streams/PairStream.java    | 345 +++++++++++++++++--
 .../org/apache/storm/streams/PartitionNode.java |   7 +-
 .../storm/streams/ProcessorBoltDelegate.java    | 111 ++++--
 .../org/apache/storm/streams/ProcessorNode.java |  24 +-
 .../jvm/org/apache/storm/streams/Stream.java    | 177 ++++++++--
 .../org/apache/storm/streams/StreamBuilder.java | 130 ++++---
 .../org/apache/storm/streams/StreamUtil.java    |  25 +-
 .../jvm/org/apache/storm/streams/Tuple3.java    |  49 ---
 .../org/apache/storm/streams/WindowNode.java    |   1 +
 .../storm/streams/operations/Aggregator.java    |  42 ---
 .../storm/streams/operations/BiFunction.java    |  37 ++
 .../streams/operations/CombinerAggregator.java  |  97 ++++++
 .../storm/streams/operations/Reducer.java       |   2 +-
 .../storm/streams/operations/StateUpdater.java  |  67 ++++
 .../streams/operations/aggregators/Count.java   |  18 +-
 .../streams/operations/aggregators/Sum.java     |  16 +-
 .../operations/mappers/TupleValueMappers.java   | 174 ++++++++++
 .../processors/AggregateByKeyProcessor.java     |  47 ++-
 .../streams/processors/AggregateProcessor.java  |  44 ++-
 .../streams/processors/BranchProcessor.java     |   6 +-
 .../processors/EmittingProcessorContext.java    |  35 +-
 .../storm/streams/processors/JoinProcessor.java |  54 ++-
 .../MergeAggregateByKeyProcessor.java           |  54 +++
 .../processors/MergeAggregateProcessor.java     |  47 +++
 .../streams/processors/ReduceProcessor.java     |   6 +-
 .../processors/UpdateStateByKeyProcessor.java   |  12 +-
 .../org/apache/storm/streams/tuple/Tuple10.java | 112 ++++++
 .../org/apache/storm/streams/tuple/Tuple3.java  |  70 ++++
 .../org/apache/storm/streams/tuple/Tuple4.java  |  76 ++++
 .../org/apache/storm/streams/tuple/Tuple5.java  |  82 +++++
 .../org/apache/storm/streams/tuple/Tuple6.java  |  89 +++++
 .../org/apache/storm/streams/tuple/Tuple7.java  |  94 +++++
 .../org/apache/storm/streams/tuple/Tuple8.java  | 100 ++++++
 .../org/apache/storm/streams/tuple/Tuple9.java  | 106 ++++++
 .../apache/storm/streams/ProcessorBoltTest.java |  13 +-
 .../streams/StatefulProcessorBoltTest.java      |  15 +-
 .../apache/storm/streams/StreamBuilderTest.java |  57 ++-
 .../streams/WindowedProcessorBoltTest.java      |   2 +-
 .../streams/processors/JoinProcessorTest.java   | 108 ++++++
 48 files changed, 2471 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
new file mode 100644
index 0000000..91dfadb
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.PairStream;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.CombinerAggregator;
+import org.apache.storm.streams.operations.mappers.TupleValueMapper;
+import org.apache.storm.streams.operations.mappers.TupleValueMappers;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.streams.tuple.Tuple3;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.utils.Utils;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * An example that illustrates the global aggregate
+ */
+public class AggregateExample {
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        /**
+         * Computes average of the stream of numbers emitted by the spout. Internally the per-partition
+         * sum and counts are accumulated and emitted to a downstream task where the partially accumulated
+         * results are merged and the final result is emitted.
+         */
+        builder.newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0), 2)
+                .window(TumblingWindows.of(BaseWindowedBolt.Duration.seconds(5)))
+                .filter(x -> x > 0 && x < 500)
+                .aggregate(new Avg())
+                .print();
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+
+    private static class Avg implements CombinerAggregator<Integer, Pair<Integer, Integer>, Double> {
+        @Override
+        public Pair<Integer, Integer> init() {
+            return Pair.of(0, 0);
+        }
+
+        @Override
+        public Pair<Integer, Integer> apply(Pair<Integer, Integer> sumAndCount, Integer value) {
+            return Pair.of(sumAndCount.getFirst() + value, sumAndCount.getSecond() + 1);
+        }
+
+        @Override
+        public Pair<Integer, Integer> merge(Pair<Integer, Integer> sumAndCount1, Pair<Integer, Integer> sumAndCount2) {
+            System.out.println("Merge " + sumAndCount1 + " and " + sumAndCount2);
+            return Pair.of(
+                    sumAndCount1.getFirst() + sumAndCount2.getFirst(),
+                    sumAndCount1.getSecond() + sumAndCount2.getSecond()
+            );
+        }
+
+        @Override
+        public Double result(Pair<Integer, Integer> sumAndCount) {
+            return (double) sumAndCount.getFirst()/sumAndCount.getSecond();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
index e76dd3c..2f0a4a3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
@@ -25,7 +25,6 @@ import org.apache.storm.streams.Pair;
 import org.apache.storm.streams.Stream;
 import org.apache.storm.streams.StreamBuilder;
 import org.apache.storm.streams.StreamState;
-import org.apache.storm.streams.operations.aggregators.Count;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.TestWordSpout;
@@ -56,8 +55,7 @@ public class StateQueryExample {
         StreamBuilder builder = new StreamBuilder();
         StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
                 .mapToPair(w -> Pair.of(w, 1))
-                .groupByKey()
-                .updateStateByKey(new Count<>());
+                .updateStateByKey(0L, (count, val) -> count + 1);
 
         /*
          * A stream of words emitted by the QuerySpout is used as

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
index f6ae6b0..ce7470d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
@@ -23,14 +23,15 @@ import org.apache.storm.StormSubmitter;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.streams.PairStream;
 import org.apache.storm.streams.StreamBuilder;
-import org.apache.storm.streams.operations.Aggregator;
-import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.StateUpdater;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.utils.Utils;
 
 /**
- * A stateful word count that uses {@link PairStream#updateStateByKey(Aggregator)} to
+ * A stateful word count that uses {@link PairStream#updateStateByKey(StateUpdater)} to
  * save the counts in a key value state. This example uses Redis state store.
  * <p>
  * You should start a local redis instance before running the 'storm jar' command. By default
@@ -48,19 +49,20 @@ public class StatefulWordCount {
     public static void main(String[] args) throws Exception {
         StreamBuilder builder = new StreamBuilder();
         // a stream of words
-        builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
+        builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2)
+                .window(TumblingWindows.of(BaseWindowedBolt.Duration.seconds(2)))
                 /*
                  * create a stream of (word, 1) pairs
                  */
                 .mapToPair(w -> Pair.of(w, 1))
                 /*
-                 * group by the word
+                 * compute the word counts in the last two second window
                  */
-                .groupByKey()
+                .countByKey()
                 /*
                  * update the word counts in the state
                  */
-                .updateStateByKey(new Count<>())
+                .updateStateByKey(0L, (x, y) -> x + y)
                  /*
                   * convert the state back to a stream and print the results
                   */

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
new file mode 100644
index 0000000..193ad661
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.PairStream;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.mappers.TupleValueMappers;
+import org.apache.storm.streams.tuple.Tuple3;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.utils.Utils;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+
+/**
+ * An example that illustrates the usage of typed tuples (TupleN<..>) and {@link TupleValueMappers}.
+ */
+public class TypedTupleExample {
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        /**
+         * The spout emits sequences of (Integer, Long, Long). TupleValueMapper can be used to extract fields
+         * from the values and produce a stream of typed tuple (Tuple3<Integer, Long, Long> in this case.
+         */
+        Stream<Tuple3<Integer, Long, Long>> stream = builder.newStream(new RandomIntegerSpout(), TupleValueMappers.of(0, 1, 2));
+
+        PairStream<Long, Integer> pairs = stream.mapToPair(t -> Pair.of(t._2 / 10000, t._1));
+
+        pairs.window(TumblingWindows.of(Count.of(10))).groupByKey().print();
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
index c6e2f4a..0765a74 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
@@ -23,7 +23,6 @@ import org.apache.storm.StormSubmitter;
 import org.apache.storm.starter.spout.RandomSentenceSpout;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.streams.StreamBuilder;
-import org.apache.storm.streams.operations.aggregators.Count;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.utils.Utils;
@@ -39,12 +38,11 @@ public class WindowedWordCount {
     public static void main(String[] args) throws Exception {
         StreamBuilder builder = new StreamBuilder();
         // A stream of random sentences
-        builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0))
+        builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
                 /*
-                 * Increase the parallelism of this stream. Further operations
-                 * on this stream will execute at this level of parallelism.
+                 * a two seconds tumbling window
                  */
-                .repartition(2)
+                .window(TumblingWindows.of(Duration.seconds(2)))
                 /*
                  * split the sentences to words
                  */
@@ -54,17 +52,9 @@ public class WindowedWordCount {
                  */
                 .mapToPair(w -> Pair.of(w, 1))
                 /*
-                 * group by word so that the same words end up in the same partition
-                 */
-                .groupByKey()
-                /*
-                 * a two seconds tumbling window
-                 */
-                .window(TumblingWindows.of(Duration.seconds(2)))
-                /*
                  * compute the word counts in the last two second window
                  */
-                .aggregateByKey(new Count<>())
+                .countByKey()
                 /*
                  * emit the count for the words that occurred
                  * at-least five times in the last two seconds

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
index a711696..dd7923a 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
@@ -26,7 +26,6 @@ import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.streams.StreamBuilder;
-import org.apache.storm.streams.operations.aggregators.Count;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.IRichBolt;
@@ -56,13 +55,9 @@ public class WordCountToBolt {
                  */
                 .mapToPair(w -> Pair.of(w, 1))
                 /*
-                 * group by key (word)
-                 */
-                .groupByKey()
-                /*
                  * aggregate the count
                  */
-                .aggregateByKey(new Count<>())
+                .countByKey()
                 /*
                  * The result of aggregation is forwarded to
                  * the RedisStoreBolt. The forwarded tuple is a

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java b/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
index 81def4b..9e8c893 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
@@ -76,6 +76,22 @@ abstract class GroupingInfo implements Serializable {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        GroupingInfo that = (GroupingInfo) o;
+
+        return fields != null ? fields.equals(that.fields) : that.fields == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return fields != null ? fields.hashCode() : 0;
+    }
+
+    @Override
     public String toString() {
         return "GroupingInfo{" +
                 "fields=" + fields +

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Node.java b/storm-core/src/jvm/org/apache/storm/streams/Node.java
index f9de390..3507f50 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Node.java
@@ -38,30 +38,47 @@ import java.util.Set;
 abstract class Node implements Serializable {
     private final Set<String> outputStreams;
     protected final Fields outputFields;
+    protected GroupingInfo groupingInfo;
     protected String componentId;
     protected int parallelism;
     // the parent streams that this node subscribes to
     private final Multimap<Node, String> parentStreams = ArrayListMultimap.create();
+    private boolean windowed;
+    private boolean emitsPair;
 
-    Node(Set<String> outputStreams, Fields outputFields, String componentId, int parallelism) {
+    Node(Set<String> outputStreams, Fields outputFields, String componentId, int parallelism,
+         GroupingInfo groupingInfo) {
         this.outputStreams = new HashSet<>(outputStreams);
         this.outputFields = outputFields;
         this.componentId = componentId;
         this.parallelism = parallelism;
+        this.groupingInfo = groupingInfo;
     }
 
-    Node(String outputStream, Fields outputFields, String componentId, int parallelism) {
-        this(Collections.singleton(outputStream), outputFields, componentId, parallelism);
+    Node(String outputStream, Fields outputFields, String componentId, int parallelism, GroupingInfo groupingInfo) {
+        this(Collections.singleton(outputStream), outputFields, componentId, parallelism, groupingInfo);
     }
 
-    Node(String outputStream, Fields outputFields, String componentId) {
-        this(outputStream, outputFields, componentId, 1);
+    Node(String outputStream, Fields outputFields, String componentId, GroupingInfo groupingInfo) {
+        this(outputStream, outputFields, componentId, 1, groupingInfo);
     }
 
     Node(String outputStream, Fields outputFields) {
         this(outputStream, outputFields, null);
     }
 
+    Node(String outputStream, Fields outputFields, GroupingInfo groupingInfo) {
+        this(outputStream, outputFields, null, groupingInfo);
+    }
+
+    GroupingInfo getGroupingInfo() {
+        return groupingInfo;
+    }
+
+    void setGroupingInfo(GroupingInfo groupingInfo) {
+        this.groupingInfo = groupingInfo;
+    }
+
     public Fields getOutputFields() {
         return outputFields;
     }
@@ -94,6 +111,14 @@ abstract class Node implements Serializable {
         return Collections.unmodifiableSet(outputStreams);
     }
 
+    public boolean isWindowed() {
+        return windowed;
+    }
+
+    public void setWindowed(boolean windowed) {
+        this.windowed = windowed;
+    }
+
     Collection<String> getParentStreams(Node parent) {
         return parentStreams.get(parent);
     }
@@ -103,6 +128,10 @@ abstract class Node implements Serializable {
         return new HashSet<>(rev.get(stream));
     }
 
+    Set<Node> getParents() {
+        return parentStreams.keySet();
+    }
+
     void addOutputStream(String streamId) {
         outputStreams.add(streamId);
     }
@@ -117,13 +146,25 @@ abstract class Node implements Serializable {
         return new Fields();
     }
 
+    public boolean emitsPair() {
+        return emitsPair;
+    }
+
+    public void setEmitsPair(boolean emitsPair) {
+        this.emitsPair = emitsPair;
+    }
+
     @Override
     public String toString() {
         return "Node{" +
-                "outputStreams='" + outputStreams + '\'' +
+                "outputStreams=" + outputStreams +
                 ", outputFields=" + outputFields +
+                ", groupingInfo=" + groupingInfo +
                 ", componentId='" + componentId + '\'' +
                 ", parallelism=" + parallelism +
+                ", parentStreams=" + parentStreams +
+                ", windowed=" + windowed +
+                ", emitsPair=" + emitsPair +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/Pair.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Pair.java b/storm-core/src/jvm/org/apache/storm/streams/Pair.java
index 0044359..e5eb792 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Pair.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Pair.java
@@ -72,6 +72,25 @@ public final class Pair<T1, T2> implements Serializable {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Pair<?, ?> pair = (Pair<?, ?>) o;
+
+        if (first != null ? !first.equals(pair.first) : pair.first != null) return false;
+        return second != null ? second.equals(pair.second) : pair.second == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = first != null ? first.hashCode() : 0;
+        result = 31 * result + (second != null ? second.hashCode() : 0);
+        return result;
+    }
+
+    @Override
     public String toString() {
         return "(" + first + ", " + second + ')';
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
index 2d18b30..964cdba 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
@@ -18,26 +18,31 @@
 package org.apache.storm.streams;
 
 import org.apache.storm.Config;
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.BiFunction;
+import org.apache.storm.streams.operations.CombinerAggregator;
 import org.apache.storm.streams.operations.Consumer;
 import org.apache.storm.streams.operations.FlatMapFunction;
 import org.apache.storm.streams.operations.Function;
 import org.apache.storm.streams.operations.PairValueJoiner;
 import org.apache.storm.streams.operations.Predicate;
 import org.apache.storm.streams.operations.Reducer;
+import org.apache.storm.streams.operations.StateUpdater;
 import org.apache.storm.streams.operations.ValueJoiner;
+import org.apache.storm.streams.operations.aggregators.Count;
 import org.apache.storm.streams.processors.AggregateByKeyProcessor;
 import org.apache.storm.streams.processors.FlatMapValuesProcessor;
 import org.apache.storm.streams.processors.JoinProcessor;
 import org.apache.storm.streams.processors.MapValuesProcessor;
+import org.apache.storm.streams.processors.MergeAggregateByKeyProcessor;
 import org.apache.storm.streams.processors.ReduceByKeyProcessor;
 import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
 import org.apache.storm.streams.windowing.Window;
 import org.apache.storm.tuple.Fields;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * Represents a stream of key-value pairs.
@@ -49,6 +54,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
 
     PairStream(StreamBuilder topology, Node node) {
         super(topology, node);
+        node.setEmitsPair(true);
     }
 
     /**
@@ -60,7 +66,8 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      * @return the new stream
      */
     public <R> PairStream<K, R> mapValues(Function<? super V, ? extends R> function) {
-        return new PairStream<>(streamBuilder, addProcessorNode(new MapValuesProcessor<>(function), KEY_VALUE));
+        return new PairStream<>(streamBuilder,
+                addProcessorNode(new MapValuesProcessor<>(function), KEY_VALUE, true));
     }
 
     /**
@@ -71,19 +78,45 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      * @param <R>      the result type
      * @return the new stream
      */
-    public <R> PairStream<K, R> flatMapValues(FlatMapFunction<V, R> function) {
-        return new PairStream<>(streamBuilder, addProcessorNode(new FlatMapValuesProcessor<>(function), KEY_VALUE));
+    public <R> PairStream<K, R> flatMapValues(FlatMapFunction<? super V, ? extends R> function) {
+        return new PairStream<>(streamBuilder,
+                addProcessorNode(new FlatMapValuesProcessor<>(function), KEY_VALUE, true));
     }
 
     /**
-     * Aggregates the values for each key of this stream using the given {@link Aggregator}.
+     * Aggregates the values for each key of this stream using the given initial value, accumulator and combiner.
      *
-     * @param aggregator the aggregator
+     * @param initialValue the initial value of the result
+     * @param accumulator  the accumulator
+     * @param combiner     the combiner
+     * @param <R>          the result type
+     * @return the new stream
+     */
+    public <R> PairStream<K, R> aggregateByKey(R initialValue,
+                                               BiFunction<? super R, ? super V, ? extends R> accumulator,
+                                               BiFunction<? super R, ? super R, ? extends R> combiner) {
+        return combineByKey(CombinerAggregator.of(initialValue, accumulator, combiner));
+    }
+
+    /**
+     * Aggregates the values for each key of this stream using the given {@link CombinerAggregator}.
+     *
+     * @param aggregator the combiner aggregator
+     * @param <A>        the accumulator type
      * @param <R>        the result type
      * @return the new stream
      */
-    public <R> PairStream<K, R> aggregateByKey(Aggregator<? super V, ? extends R> aggregator) {
-        return new PairStream<>(streamBuilder, addProcessorNode(new AggregateByKeyProcessor<>(aggregator), KEY_VALUE));
+    public <A, R> PairStream<K, R> aggregateByKey(CombinerAggregator<? super V, A, ? extends R> aggregator) {
+        return combineByKey(aggregator);
+    }
+
+    /**
+     * Counts the values for each key of this stream.
+     *
+     * @return the new stream
+     */
+    public PairStream<K, Long> countByKey() {
+        return aggregateByKey(new Count<>());
     }
 
     /**
@@ -93,7 +126,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      * @return the new stream
      */
     public PairStream<K, V> reduceByKey(Reducer<V> reducer) {
-        return new PairStream<>(streamBuilder, addProcessorNode(new ReduceByKeyProcessor<>(reducer), KEY_VALUE));
+        return combineByKey(reducer);
     }
 
     /**
@@ -101,8 +134,8 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      *
      * @return the new stream
      */
-    public PairStream<K, V> groupByKey() {
-        return partitionBy(KEY);
+    public PairStream<K, Iterable<V>> groupByKey() {
+        return partitionByKey().aggregatePartition(new MergeValues<>());
     }
 
     /**
@@ -114,7 +147,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      * @return the new stream
      */
     public PairStream<K, Iterable<V>> groupByKeyAndWindow(Window<?, ?> window) {
-        return groupByKey().window(window).aggregateByKey(new MergeValues<>());
+        return partitionByKey().window(window).aggregatePartition(new MergeValues<>());
     }
 
     /**
@@ -126,7 +159,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      * @return the new stream
      */
     public PairStream<K, V> reduceByKeyAndWindow(Reducer<V> reducer, Window<?, ?> window) {
-        return groupByKey().window(window).reduceByKey(reducer);
+        return partitionByKey().window(window).reduceByKey(reducer);
     }
 
     /**
@@ -138,9 +171,16 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
     }
 
     /**
+     * {@inheritDoc}
+     */
+    public PairStream<K, V> filter(Predicate<? super Pair<K, V>> predicate) {
+        return toPairStream(super.filter(predicate));
+    }
+
+    /**
      * Join the values of this stream with the values having the same key from the other stream.
      * <p>
-     * Note: The parallelism and windowing parameters (if windowed) of this stream is carried forward to the joined stream.
+     * Note: The parallelism of this stream is carried forward to the joined stream.
      * </p>
      *
      * @param otherStream the other stream
@@ -152,9 +192,51 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
     }
 
     /**
+     * Does a left outer join of the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <V1> PairStream<K, Pair<V, V1>> leftOuterJoin(PairStream<K, V1> otherStream) {
+        return leftOuterJoin(otherStream, new PairValueJoiner<>());
+    }
+
+    /**
+     * Does a right outer join of the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <V1> PairStream<K, Pair<V, V1>> rightOuterJoin(PairStream<K, V1> otherStream) {
+        return rightOuterJoin(otherStream, new PairValueJoiner<>());
+    }
+
+    /**
+     * Does a full outer join of the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <V1> PairStream<K, Pair<V, V1>> fullOuterJoin(PairStream<K, V1> otherStream) {
+        return fullOuterJoin(otherStream, new PairValueJoiner<>());
+    }
+
+    /**
      * Join the values of this stream with the values having the same key from the other stream.
      * <p>
-     * Note: The parallelism and windowing parameters (if windowed) of this stream is carried forward to the joined stream.
+     * Note: The parallelism of this stream is carried forward to the joined stream.
      * </p>
      *
      * @param otherStream the other stream
@@ -165,11 +247,78 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      */
     public <R, V1> PairStream<K, R> join(PairStream<K, V1> otherStream,
                                          ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
-        String leftStream = stream;
-        String rightStream = otherStream.stream;
-        Node joinNode = addProcessorNode(new JoinProcessor<>(leftStream, rightStream, valueJoiner), KEY_VALUE);
-        addNode(otherStream.getNode(), joinNode, joinNode.getParallelism());
-        return new PairStream<>(streamBuilder, joinNode);
+        return partitionByKey()
+                .joinPartition(
+                        otherStream.partitionByKey(),
+                        valueJoiner,
+                        JoinProcessor.JoinType.INNER,
+                        JoinProcessor.JoinType.INNER);
+    }
+
+    /**
+     * Does a left outer join of the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param valueJoiner the {@link ValueJoiner}
+     * @param <R>         the type of the values resulting from the join
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <R, V1> PairStream<K, R> leftOuterJoin(PairStream<K, V1> otherStream,
+                                         ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
+        return partitionByKey()
+                .joinPartition(
+                        otherStream.partitionByKey(),
+                        valueJoiner,
+                        JoinProcessor.JoinType.OUTER,
+                        JoinProcessor.JoinType.INNER);
+    }
+
+    /**
+     * Does a right outer join of the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param valueJoiner the {@link ValueJoiner}
+     * @param <R>         the type of the values resulting from the join
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <R, V1> PairStream<K, R> rightOuterJoin(PairStream<K, V1> otherStream,
+                                                  ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
+        return partitionByKey()
+                .joinPartition(
+                        otherStream.partitionByKey(),
+                        valueJoiner,
+                        JoinProcessor.JoinType.INNER,
+                        JoinProcessor.JoinType.OUTER);
+    }
+
+    /**
+     * Does a full outer join of the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param valueJoiner the {@link ValueJoiner}
+     * @param <R>         the type of the values resulting from the join
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <R, V1> PairStream<K, R> fullOuterJoin(PairStream<K, V1> otherStream,
+                                                  ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
+        return partitionByKey()
+                .joinPartition(
+                        otherStream.partitionByKey(),
+                        valueJoiner,
+                        JoinProcessor.JoinType.OUTER,
+                        JoinProcessor.JoinType.OUTER);
     }
 
     /**
@@ -193,7 +342,7 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      */
     @Override
     @SuppressWarnings("unchecked")
-    public PairStream<K, V>[] branch(Predicate<Pair<K, V>>... predicates) {
+    public PairStream<K, V>[] branch(Predicate<? super Pair<K, V>>... predicates) {
         List<PairStream<K, V>> pairStreams = new ArrayList<>();
         for (Stream<Pair<K, V>> stream : super.branch(predicates)) {
             pairStreams.add(toPairStream(stream));
@@ -202,40 +351,178 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
     }
 
     /**
-     * Update the state by applying the given aggregator to the previous state of the
+     * Update the state by applying the given state update function to the previous state of the
      * key and the new value for the key. This internally uses {@link org.apache.storm.topology.IStatefulBolt}
      * to save the state. Use {@link Config#TOPOLOGY_STATE_PROVIDER} to choose the state implementation.
      *
-     * @param aggregator the aggregator
+     * @param stateUpdateFn the state update function
      * @param <R>        the result type
      * @return the {@link StreamState} which can be used to query the state
      */
-    public <R> StreamState<K, R> updateStateByKey(Aggregator<? super V, ? extends R> aggregator) {
+    public <R> StreamState<K, R> updateStateByKey(R initialValue,
+                                                  BiFunction<? super R, ? super V, ? extends R> stateUpdateFn) {
+        return updateStateByKey(StateUpdater.of(initialValue, stateUpdateFn));
+    }
+
+    /**
+     * Update the state by applying the given state update function to the previous state of the
+     * key and the new value for the key. This internally uses {@link org.apache.storm.topology.IStatefulBolt}
+     * to save the state. Use {@link Config#TOPOLOGY_STATE_PROVIDER} to choose the state implementation.
+     *
+     * @param stateUpdater the state updater
+     * @param <R>          the result type
+     * @return the {@link StreamState} which can be used to query the state
+     */
+    public <R> StreamState<K, R> updateStateByKey(StateUpdater<? super V, ? extends R> stateUpdater) {
+        return partitionByKey().updateStateByKeyPartition(stateUpdater);
+    }
+
+    private <R> StreamState<K, R> updateStateByKeyPartition(StateUpdater<? super V, ? extends R> stateUpdater) {
         return new StreamState<>(
-                new PairStream<>(streamBuilder, addProcessorNode(new UpdateStateByKeyProcessor<>(aggregator), KEY_VALUE)));
+                new PairStream<>(streamBuilder,
+                        addProcessorNode(new UpdateStateByKeyProcessor<>(stateUpdater), KEY_VALUE, true)));
+    }
+
+    private <R, V1> PairStream<K, R> joinPartition(PairStream<K, V1> otherStream,
+                                                   ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner,
+                                                   JoinProcessor.JoinType leftType,
+                                                   JoinProcessor.JoinType rightType) {
+        String leftStream = stream;
+        String rightStream = otherStream.stream;
+        Node joinNode = addProcessorNode(
+                new JoinProcessor<>(leftStream, rightStream, valueJoiner, leftType, rightType),
+                KEY_VALUE,
+                true);
+        addNode(otherStream.getNode(), joinNode, joinNode.getParallelism());
+        return new PairStream<>(streamBuilder, joinNode);
+    }
+
+    private PairStream<K, V> partitionByKey() {
+        return shouldPartitionByKey() ? partitionBy(KEY) : this;
+    }
+
+    private boolean shouldPartitionByKey() {
+        if (node.getParallelism() == 1) {
+            return false;
+        }
+        /*
+         * if the current processor preserves the key and is
+         * already partitioned on key, skip the re-partition.
+         */
+        if (node instanceof ProcessorNode) {
+            ProcessorNode pn = (ProcessorNode) node;
+            Fields fields = pn.getGroupingInfo() == null ? null : pn.getGroupingInfo().getFields();
+            if (pn.isPreservesKey() && fields != null && fields.equals(KEY)) {
+                return false;
+            }
+        }
+        return true;
     }
 
     private PairStream<K, V> partitionBy(Fields fields) {
+        return partitionBy(fields, node.parallelism);
+    }
+
+    private PairStream<K, V> partitionBy(Fields fields, int parallelism) {
         return new PairStream<>(
                 streamBuilder,
-                addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.fields(fields))));
+                addNode(node, new PartitionNode(stream, node.getOutputFields(), GroupingInfo.fields(fields)), parallelism));
     }
 
     private PairStream<K, V> toPairStream(Stream<Pair<K, V>> stream) {
         return new PairStream<>(stream.streamBuilder, stream.node);
     }
 
+    private <A, R> PairStream<K, R> aggregatePartition(CombinerAggregator<? super V, A, ? extends R> aggregator) {
+        return new PairStream<>(streamBuilder,
+                addProcessorNode(new AggregateByKeyProcessor<>(aggregator), KEY_VALUE, true));
+    }
+
+    private <A> PairStream<K, A> combinePartition(CombinerAggregator<? super V, A, ?> aggregator) {
+        return new PairStream<>(streamBuilder,
+                addProcessorNode(new AggregateByKeyProcessor<>(aggregator, true), KEY_VALUE, true));
+    }
+
+    private <R> PairStream<K, R> merge(CombinerAggregator<?, V, ? extends R> aggregator) {
+        return new PairStream<>(streamBuilder,
+                addProcessorNode(new MergeAggregateByKeyProcessor<>(aggregator), KEY_VALUE, true));
+    }
+
+    private PairStream<K, V> reducePartition(Reducer<V> reducer) {
+        return new PairStream<>(streamBuilder,
+                addProcessorNode(new ReduceByKeyProcessor<>(reducer), KEY_VALUE, true));
+    }
+
+    // if re-partitioning is involved, does a per-partition aggregate by key before emitting the results downstream
+    private <A, R> PairStream<K, R> combineByKey(CombinerAggregator<? super V, A, ? extends R> aggregator) {
+        if (shouldPartitionByKey()) {
+            if (node instanceof ProcessorNode) {
+                if (node.isWindowed()) {
+                    return combinePartition(aggregator).partitionBy(KEY).merge(aggregator);
+                }
+            } else if (node instanceof WindowNode) {
+                Set<Node> parents = node.getParents();
+                Optional<Node> nonWindowed = parents.stream().filter(p -> !p.isWindowed()).findAny();
+                if (!nonWindowed.isPresent()) {
+                    parents.forEach(p -> {
+                        Node localAggregateNode = makeProcessorNode(
+                                new AggregateByKeyProcessor<>(aggregator, true), KEY_VALUE, true);
+                        streamBuilder.insert(p, localAggregateNode);
+                    });
+                    return ((PairStream<K, A>)partitionBy(KEY)).merge(aggregator);
+                }
+            }
+            return partitionBy(KEY).aggregatePartition(aggregator);
+        } else {
+            return aggregatePartition(aggregator);
+        }
+    }
+
+    // if re-partitioning is involved, does a per-partition reduce by key before emitting the results downstream
+    private PairStream<K, V> combineByKey(Reducer<V> reducer) {
+        if (shouldPartitionByKey()) {
+            if (node instanceof ProcessorNode) {
+                if (node.isWindowed()) {
+                    return reducePartition(reducer).partitionBy(KEY).reducePartition(reducer);
+                }
+            } else if (node instanceof WindowNode) {
+                for (Node p : node.getParents()) {
+                    if (p.isWindowed()) {
+                        Node localReduceNode = makeProcessorNode(new ReduceByKeyProcessor<>(reducer), KEY_VALUE, true);
+                        streamBuilder.insert(p, localReduceNode);
+                    }
+                }
+            }
+            return partitionBy(KEY).reducePartition(reducer);
+        } else {
+            return reducePartition(reducer);
+        }
+    }
+
     // used internally to merge values in groupByKeyAndWindow
-    private static class MergeValues<V> implements Aggregator<V, ArrayList<V>> {
+    private static class MergeValues<V> implements CombinerAggregator<V, ArrayList<V>, ArrayList<V>> {
         @Override
         public ArrayList<V> init() {
             return new ArrayList<>();
         }
 
         @Override
-        public ArrayList<V> apply(V value, ArrayList<V> aggregate) {
+        public ArrayList<V> apply(ArrayList<V> aggregate, V value) {
             aggregate.add(value);
             return aggregate;
         }
+
+        @Override
+        public ArrayList<V> merge(ArrayList<V> accum1, ArrayList<V> accum2) {
+            ArrayList<V> res = new ArrayList<V>();
+            res.addAll(accum1);
+            res.addAll(accum2);
+            return res;
+        }
+
+        @Override
+        public ArrayList<V> result(ArrayList<V> accum) {
+            return accum;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java b/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
index ca92def..90b4d7c 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
@@ -25,18 +25,13 @@ import org.apache.storm.tuple.Fields;
  * aggregate/reduce (global grouping), state query (all grouping).
  */
 class PartitionNode extends Node {
-    private final GroupingInfo groupingInfo;
 
     PartitionNode(String outputStream, Fields outputFields, GroupingInfo groupingInfo) {
-        super(outputStream, outputFields);
-        this.groupingInfo = groupingInfo;
+        super(outputStream, outputFields, groupingInfo);
     }
 
     PartitionNode(String outputStream, Fields outputFields) {
         this(outputStream, outputFields, GroupingInfo.shuffle());
     }
 
-    GroupingInfo getGroupingInfo() {
-        return groupingInfo;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java b/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
index 5bc6fff..3f3b5c1 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
@@ -18,7 +18,10 @@
 package org.apache.storm.streams;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.streams.processors.ChainedProcessorContext;
 import org.apache.storm.streams.processors.EmittingProcessorContext;
 import org.apache.storm.streams.processors.ForwardingProcessorContext;
@@ -55,8 +58,9 @@ class ProcessorBoltDelegate implements Serializable {
     private OutputCollector outputCollector;
     private final List<ProcessorNode> outgoingProcessors = new ArrayList<>();
     private final Set<EmittingProcessorContext> emittingProcessorContexts = new HashSet<>();
-    private final Map<ProcessorNode, Set<String>> punctuationState = new HashMap<>();
+    private final Table<ProcessorNode, String, Integer> punctuationState = HashBasedTable.create();
     private Multimap<String, ProcessorNode> streamToInitialProcessors;
+    private final Map<String, Integer> streamToInputTaskCount = new HashMap<>();
     private String timestampField;
 
     ProcessorBoltDelegate(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
@@ -114,6 +118,9 @@ class ProcessorBoltDelegate implements Serializable {
                 ctx.setTimestampField(timestampField);
             }
         }
+        for (String stream : streamToInitialProcessors.keySet()) {
+            streamToInputTaskCount.put(stream, getStreamInputTaskCount(context, stream));
+        }
     }
 
     void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -127,6 +134,12 @@ class ProcessorBoltDelegate implements Serializable {
                     fields.add(timestampField);
                     declarer.declareStream(stream, new Fields(fields));
                 }
+                /*
+                 * Declare a separate 'punctuation' stream per output stream so that the receiving bolt
+                 * can subscribe to this stream with 'ALL' grouping and process the punctuation once it
+                 * receives from all upstream tasks.
+                 */
+                declarer.declareStream(StreamUtil.getPunctuationStream(stream), StreamUtil.getPunctuationFields());
             }
         }
     }
@@ -168,20 +181,32 @@ class ProcessorBoltDelegate implements Serializable {
 
     void process(Object value, String sourceStreamId) {
         LOG.debug("Process value {}, sourceStreamId {}", value, sourceStreamId);
+        if (StreamUtil.isPunctuation(value)) {
+            punctuateInitialProcessors(sourceStreamId);
+        } else {
+            executeInitialProcessors(value, sourceStreamId);
+        }
+    }
+
+    private void punctuateInitialProcessors(String punctuationStreamId) {
+        String sourceStreamId = StreamUtil.getSourceStream(punctuationStreamId);
         Collection<ProcessorNode> initialProcessors = streamToInitialProcessors.get(sourceStreamId);
         for (ProcessorNode processorNode : initialProcessors) {
-            Processor processor = processorNode.getProcessor();
-            if (StreamUtil.isPunctuation(value)) {
-                if (shouldPunctuate(processorNode, sourceStreamId)) {
-                    processor.punctuate(null);
-                    clearPunctuationState(processorNode);
-                }
-            } else {
-                processor.execute(value, sourceStreamId);
+            if (shouldPunctuate(processorNode, sourceStreamId)) {
+                processorNode.getProcessor().punctuate(null);
+                clearPunctuationState(processorNode);
             }
         }
     }
 
+    private void executeInitialProcessors(Object value, String sourceStreamId) {
+        Collection<ProcessorNode> initialProcessors = streamToInitialProcessors.get(sourceStreamId);
+        for (ProcessorNode processorNode : initialProcessors) {
+            Processor processor = processorNode.getProcessor();
+            processor.execute(value, sourceStreamId);
+        }
+    }
+
     void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
         this.streamToInitialProcessors = streamToInitialProcessors;
     }
@@ -225,9 +250,6 @@ class ProcessorBoltDelegate implements Serializable {
         List<EmittingProcessorContext> emittingContexts = new ArrayList<>();
         for (String stream : processorNode.getOutputStreams()) {
             EmittingProcessorContext emittingContext = new EmittingProcessorContext(processorNode, outputCollector, stream);
-            if (StreamUtil.isSinkStream(stream)) {
-                emittingContext.setEmitPunctuation(false);
-            }
             emittingContexts.add(emittingContext);
         }
         emittingProcessorContexts.addAll(emittingContexts);
@@ -257,24 +279,48 @@ class ProcessorBoltDelegate implements Serializable {
         return children;
     }
 
-    // if we received punctuation from all parent windowed streams
+    // for the given processor node, if we received punctuation from all tasks of its parent windowed streams
     private boolean shouldPunctuate(ProcessorNode processorNode, String sourceStreamId) {
-        if (processorNode.getWindowedParentStreams().size() <= 1) {
-            return true;
+        if (!processorNode.getWindowedParentStreams().isEmpty()) {
+            updateCount(processorNode, sourceStreamId);
+            if (punctuationState.row(processorNode).size() != processorNode.getWindowedParentStreams().size()) {
+                return false;
+            }
+            // size matches, check if the streams are expected
+            Set<String> receivedStreams = punctuationState.row(processorNode).keySet();
+            if (!receivedStreams.equals(processorNode.getWindowedParentStreams())) {
+                throw new IllegalStateException("Received punctuation from streams " + receivedStreams + " expected "
+                        + processorNode.getWindowedParentStreams());
+            }
+            for (String receivedStream : receivedStreams) {
+                Integer expected = streamToInputTaskCount.get(receivedStream);
+                if (expected == null) {
+                    throw new IllegalStateException("Punctuation received on unexpected stream '" + receivedStream +
+                            "' for which input task count is not set.");
+                }
+                if (punctuationState.get(processorNode, receivedStream) < streamToInputTaskCount.get(receivedStream)) {
+                    return false;
+                }
+            }
         }
-        Set<String> receivedStreams = punctuationState.get(processorNode);
-        if (receivedStreams == null) {
-            receivedStreams = new HashSet<>();
-            punctuationState.put(processorNode, receivedStreams);
+        return true;
+    }
+
+    private void updateCount(ProcessorNode processorNode, String sourceStreamId) {
+        Integer count = punctuationState.get(processorNode, sourceStreamId);
+        if (count == null) {
+            punctuationState.put(processorNode, sourceStreamId, 1);
+        } else {
+            punctuationState.put(processorNode, sourceStreamId, count + 1);
         }
-        receivedStreams.add(sourceStreamId);
-        return receivedStreams.equals(processorNode.getWindowedParentStreams());
     }
 
     private void clearPunctuationState(ProcessorNode processorNode) {
-        Set<String> state = punctuationState.get(processorNode);
-        if (state != null) {
-            state.clear();
+        if (!punctuationState.isEmpty()) {
+            Map<String, Integer> state = punctuationState.row(processorNode);
+            if (!state.isEmpty()) {
+                state.clear();
+            }
         }
     }
 
@@ -282,4 +328,21 @@ class ProcessorBoltDelegate implements Serializable {
         return input.size() == (timestampField == null ? 2 : 3);
     }
 
+    private int getStreamInputTaskCount(TopologyContext context, String stream) {
+        int count = 0;
+        for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
+            if (stream.equals(getStreamId(inputStream))) {
+                count += context.getComponentTasks(inputStream.get_componentId()).size();
+            }
+        }
+        return count;
+    }
+
+    private String getStreamId(GlobalStreamId globalStreamId) {
+        if (globalStreamId.get_componentId().startsWith("spout")) {
+            return globalStreamId.get_componentId() + globalStreamId.get_streamId();
+        }
+        return globalStreamId.get_streamId();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java b/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
index 4771f4f..b68dd48 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
@@ -32,32 +32,29 @@ import java.util.Set;
 public class ProcessorNode extends Node {
     private final Processor<?> processor;
     private final boolean isBatch;
-    private boolean windowed;
+    private final boolean preservesKey;
     // Windowed parent streams
     private Set<String> windowedParentStreams = Collections.emptySet();
 
-    public ProcessorNode(Processor<?> processor, String outputStream, Fields outputFields) {
+    public ProcessorNode(Processor<?> processor, String outputStream, Fields outputFields, boolean preservesKey) {
         super(outputStream, outputFields);
         this.isBatch = processor instanceof BatchProcessor;
         this.processor = processor;
+        this.preservesKey = preservesKey;
     }
 
-    public Processor<?> getProcessor() {
-        return processor;
+    public ProcessorNode(Processor<?> processor, String outputStream, Fields outputFields) {
+        this(processor, outputStream, outputFields, false);
     }
 
-    public boolean isWindowed() {
-        return windowed;
+    public Processor<?> getProcessor() {
+        return processor;
     }
 
     public boolean isBatch() {
         return isBatch;
     }
 
-    public void setWindowed(boolean windowed) {
-        this.windowed = windowed;
-    }
-
     public Set<String> getWindowedParentStreams() {
         return Collections.unmodifiableSet(windowedParentStreams);
     }
@@ -70,11 +67,16 @@ public class ProcessorNode extends Node {
         this.windowedParentStreams = new HashSet<>(windowedParentStreams);
     }
 
+    public boolean isPreservesKey() {
+        return preservesKey;
+    }
+
     @Override
     public String toString() {
         return "ProcessorNode{" +
                 "processor=" + processor +
-                ", windowed=" + windowed +
+                ", isBatch=" + isBatch +
+                ", preservesKey=" + preservesKey +
                 ", windowedParentStreams=" + windowedParentStreams +
                 "} " + super.toString();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Stream.java b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
index e50e7a2..d553390 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
@@ -17,7 +17,8 @@
  */
 package org.apache.storm.streams;
 
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.BiFunction;
+import org.apache.storm.streams.operations.CombinerAggregator;
 import org.apache.storm.streams.operations.Consumer;
 import org.apache.storm.streams.operations.FlatMapFunction;
 import org.apache.storm.streams.operations.Function;
@@ -27,12 +28,14 @@ import org.apache.storm.streams.operations.PairFunction;
 import org.apache.storm.streams.operations.Predicate;
 import org.apache.storm.streams.operations.PrintConsumer;
 import org.apache.storm.streams.operations.Reducer;
+import org.apache.storm.streams.operations.aggregators.Count;
 import org.apache.storm.streams.processors.AggregateProcessor;
 import org.apache.storm.streams.processors.BranchProcessor;
 import org.apache.storm.streams.processors.FilterProcessor;
 import org.apache.storm.streams.processors.FlatMapProcessor;
 import org.apache.storm.streams.processors.ForEachProcessor;
 import org.apache.storm.streams.processors.MapProcessor;
+import org.apache.storm.streams.processors.MergeAggregateProcessor;
 import org.apache.storm.streams.processors.PeekProcessor;
 import org.apache.storm.streams.processors.Processor;
 import org.apache.storm.streams.processors.ReduceProcessor;
@@ -42,9 +45,13 @@ import org.apache.storm.topology.IBasicBolt;
 import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * Represents a stream of values.
@@ -52,6 +59,8 @@ import java.util.List;
  * @param <T> the type of the value
  */
 public class Stream<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(Stream.class);
+
     protected static final Fields KEY = new Fields("key");
     protected static final Fields VALUE = new Fields("value");
     protected static final Fields KEY_VALUE = new Fields("key", "value");
@@ -79,7 +88,7 @@ public class Stream<T> {
      * @return the new stream
      */
     public Stream<T> filter(Predicate<? super T> predicate) {
-        return new Stream<>(streamBuilder, addProcessorNode(new FilterProcessor<>(predicate), VALUE));
+        return new Stream<>(streamBuilder, addProcessorNode(new FilterProcessor<>(predicate), VALUE, true));
     }
 
     /**
@@ -100,7 +109,7 @@ public class Stream<T> {
      * @param <V>      the value type
      * @return the new stream of key-value pairs
      */
-    public <K, V> PairStream<K, V> mapToPair(PairFunction<T, K, V> function) {
+    public <K, V> PairStream<K, V> mapToPair(PairFunction<? super T, ? extends K, ? extends V> function) {
         return new PairStream<>(streamBuilder, addProcessorNode(new MapProcessor<>(function), KEY_VALUE));
     }
 
@@ -113,7 +122,7 @@ public class Stream<T> {
      * @param function a mapping function to be applied to each value in this stream which produces new values.
      * @return the new stream
      */
-    public <R> Stream<R> flatMap(FlatMapFunction<T, R> function) {
+    public <R> Stream<R> flatMap(FlatMapFunction<? super T, ? extends R> function) {
         return new Stream<>(streamBuilder, addProcessorNode(new FlatMapProcessor<>(function), VALUE));
     }
 
@@ -128,7 +137,7 @@ public class Stream<T> {
      * @see #flatMap(FlatMapFunction)
      * @see #mapToPair(PairFunction)
      */
-    public <K, V> PairStream<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> function) {
+    public <K, V> PairStream<K, V> flatMapToPair(PairFlatMapFunction<? super T, ? extends K, ? extends V> function) {
         return new PairStream<>(streamBuilder, addProcessorNode(new FlatMapProcessor<>(function), KEY_VALUE));
     }
 
@@ -173,12 +182,11 @@ public class Stream<T> {
      * @return the new stream
      */
     public Stream<T> peek(Consumer<? super T> action) {
-        return new Stream<>(streamBuilder, addProcessorNode(new PeekProcessor<>(action), node.getOutputFields()));
+        return new Stream<>(streamBuilder, addProcessorNode(new PeekProcessor<>(action), node.getOutputFields(), true));
     }
 
     /**
-     * Aggregates the values in this stream using the aggregator. This does a global aggregation, i.e. the elements
-     * across all the partitions are forwarded to a single task for computing the aggregate.
+     * Aggregates the values in this stream using the aggregator. This does a global aggregation of values across all partitions.
      * <p>
      * If the stream is windowed, the aggregate result is emitted after each window activation and represents the
      * aggregate of elements that fall within that window.
@@ -186,15 +194,52 @@ public class Stream<T> {
      * </p>
      *
      * @param aggregator the aggregator
+     * @param <A>        the accumulator type
      * @param <R>        the result type
      * @return the new stream
      */
-    public <R> Stream<R> aggregate(Aggregator<? super T, ? extends R> aggregator) {
-        return new Stream<>(streamBuilder, global().addProcessorNode(new AggregateProcessor<>(aggregator), VALUE));
+    public <A, R> Stream<R> aggregate(CombinerAggregator<? super T, A, ? extends R> aggregator) {
+        return combine(aggregator);
+    }
+
+    /**
+     * Aggregates the values in this stream using the given initial value, accumulator and combiner. This does a global
+     * aggregation of values across all partitions.
+     * <p>
+     * If the stream is windowed, the aggregate result is emitted after each window activation and represents the
+     * aggregate of elements that fall within that window.
+     * If the stream is not windowed, the aggregate result is emitted as each new element in the stream is processed.
+     * </p>
+     *
+     * @param initialValue the initial value of the result
+     * @param accumulator  the accumulator
+     * @param combiner     the combiner
+     * @param <R>          the result type
+     * @return the new stream
+     */
+    public <R> Stream<R> aggregate(R initialValue,
+                                   BiFunction<? super R, ? super T, ? extends R> accumulator,
+                                   BiFunction<? super R, ? super R, ? extends R> combiner) {
+        return combine(CombinerAggregator.of(initialValue, accumulator, combiner));
+    }
+
+    /**
+     * Counts the number of values in this stream. This does a global count of values across all partitions.
+     * <p>
+     * If the stream is windowed, the counts are emitted after each window activation and represents the
+     * count of elements that fall within that window.
+     * If the stream is not windowed, the count is emitted as each new element in the stream is processed.
+     * </p>
+     *
+     * @return the new stream
+     */
+    public Stream<Long> count() {
+        return aggregate(new Count<>());
     }
 
     /**
      * Performs a reduction on the elements of this stream, by repeatedly applying the reducer.
+     * This does a global reduction of values across all partitions.
      * <p>
      * If the stream is windowed, the result is emitted after each window activation and represents the
      * reduction of elements that fall within that window.
@@ -205,7 +250,7 @@ public class Stream<T> {
      * @return the new stream
      */
     public Stream<T> reduce(Reducer<T> reducer) {
-        return new Stream<>(streamBuilder, global().addProcessorNode(new ReduceProcessor<>(reducer), VALUE));
+        return combine(reducer);
     }
 
     /**
@@ -219,6 +264,10 @@ public class Stream<T> {
         if (parallelism < 1) {
             throw new IllegalArgumentException("Parallelism should be >= 1");
         }
+        if (node.getParallelism() == parallelism) {
+            LOG.debug("Node's current parallelism {}, new parallelism {}", node.getParallelism(), parallelism);
+            return this;
+        }
         Node partitionNode = addNode(node, new PartitionNode(stream, node.getOutputFields()), parallelism);
         return new Stream<>(streamBuilder, partitionNode);
     }
@@ -233,12 +282,12 @@ public class Stream<T> {
      * @return an array of result streams (branches) corresponding to the given predicates
      */
     @SuppressWarnings("unchecked")
-    public Stream<T>[] branch(Predicate<T>... predicates) {
+    public Stream<T>[] branch(Predicate<? super T>... predicates) {
         List<Stream<T>> childStreams = new ArrayList<>();
         if (predicates.length > 0) {
             BranchProcessor<T> branchProcessor = new BranchProcessor<>();
             Node branchNode = addProcessorNode(branchProcessor, VALUE);
-            for (Predicate<T> predicate : predicates) {
+            for (Predicate<? super T> predicate : predicates) {
                 // create a child node (identity) per branch
                 ProcessorNode child = makeProcessorNode(new MapProcessor<>(new IdentityFunction<>()), node.getOutputFields());
                 String branchStream = child.getOutputStreams().iterator().next() + "-branch";
@@ -321,10 +370,8 @@ public class Stream<T> {
      * @return the result stream
      */
     public <V> PairStream<T, V> stateQuery(StreamState<T, V> streamState) {
-        // need all grouping for state query since the state is local
+        // need all grouping for state query since the state is per-task
         Node node = all().addProcessorNode(new StateQueryProcessor<>(streamState), KEY_VALUE);
-        // add 'updateState' node as parent so that state query gets processed after update state
-        addNode(streamState.getUpdateStateNode(), node, node.getParallelism());
         return new PairStream<>(streamBuilder, node);
     }
 
@@ -337,13 +384,17 @@ public class Stream<T> {
     }
 
     Node addNode(Node child) {
-        return addNode(this.node, child);
+        return addNode(node, child);
     }
 
     Node addProcessorNode(Processor<?> processor, Fields outputFields) {
         return addNode(makeProcessorNode(processor, outputFields));
     }
 
+    Node addProcessorNode(Processor<?> processor, Fields outputFields, boolean preservesKey) {
+        return addNode(makeProcessorNode(processor, outputFields, preservesKey));
+    }
+
     String getStream() {
         return stream;
     }
@@ -356,12 +407,16 @@ public class Stream<T> {
         return streamBuilder.addNode(parent, child, parentStreamId);
     }
 
-    private Node addNode(Node child, int parallelism, String parentStreamId) {
-        return streamBuilder.addNode(this.node, child, parallelism, parentStreamId);
+    private Node addNode(Node parent, Node child, String parentStreamId, int parallelism) {
+        return streamBuilder.addNode(parent, child, parentStreamId, parallelism);
     }
 
     private ProcessorNode makeProcessorNode(Processor<?> processor, Fields outputFields) {
-        return new ProcessorNode(processor, UniqueIdGen.getInstance().getUniqueStreamId(), outputFields);
+        return makeProcessorNode(processor, outputFields, false);
+    }
+
+    ProcessorNode makeProcessorNode(Processor<?> processor, Fields outputFields, boolean preservesKey) {
+        return new ProcessorNode(processor, UniqueIdGen.getInstance().getUniqueStreamId(), outputFields, preservesKey);
     }
 
     private void addSinkNode(SinkNode sinkNode, int parallelism) {
@@ -369,15 +424,9 @@ public class Stream<T> {
         sinkNode.setComponentId(boltId);
         sinkNode.setParallelism(parallelism);
         if (node instanceof SpoutNode) {
-            addNode(sinkNode, parallelism, Utils.DEFAULT_STREAM_ID);
+            addNode(node, sinkNode, Utils.DEFAULT_STREAM_ID, parallelism);
         } else {
-            /*
-              * add a stream__sink stream to the current node (parent) for emitting
-              * just the values (no punctuation) to the bolt.
-              */
-            String sinkStream = StreamUtil.getSinkStream(stream);
-            node.addOutputStream(sinkStream);
-            addNode(sinkNode, parallelism, sinkStream);
+            addNode(node, sinkNode, parallelism);
         }
     }
 
@@ -387,7 +436,79 @@ public class Stream<T> {
     }
 
     private Stream<T> all() {
+        if (node.getParallelism() == 1) {
+            return this;
+        }
         Node partitionNode = addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.all()));
         return new Stream<>(streamBuilder, partitionNode);
     }
+
+    private boolean shouldPartition() {
+        return node.getParallelism() > 1;
+    }
+
+    private <A> Stream<A> combinePartition(CombinerAggregator<? super T, A, ?> aggregator) {
+        return new Stream<>(streamBuilder,
+                addProcessorNode(new AggregateProcessor<>(aggregator, true), VALUE, true));
+    }
+
+    private <R> Stream<R> merge(CombinerAggregator<?, T, ? extends R> aggregator) {
+        return new Stream<>(streamBuilder,
+                addProcessorNode(new MergeAggregateProcessor<>(aggregator), VALUE));
+    }
+
+    private  <A, R> Stream<R> aggregatePartition(CombinerAggregator<? super T, A, ? extends R> aggregator) {
+        return new Stream<>(streamBuilder, addProcessorNode(new AggregateProcessor<>(aggregator), VALUE));
+    }
+
+    private Stream<T> reducePartition(Reducer<T> reducer) {
+        return new Stream<>(streamBuilder, addProcessorNode(new ReduceProcessor<>(reducer), VALUE));
+    }
+
+    // if re-partitioning is involved, does a per-partition aggregate before emitting the results downstream
+    private <A, R> Stream<R> combine(CombinerAggregator<? super T, A, ? extends R> aggregator) {
+        if (shouldPartition()) {
+            if (node instanceof ProcessorNode) {
+                if (node.isWindowed()) {
+                    return combinePartition(aggregator).global().merge(aggregator);
+                }
+            } else if (node instanceof WindowNode) {
+                Set<Node> parents = node.getParents();
+                Optional<Node> nonWindowed = parents.stream().filter(p -> !p.isWindowed()).findAny();
+                if (!nonWindowed.isPresent()) {
+                    parents.forEach(p -> {
+                        Node localAggregateNode = makeProcessorNode(
+                                new AggregateProcessor<>(aggregator, true), VALUE, true);
+                        streamBuilder.insert(p, localAggregateNode);
+                    });
+                    return ((Stream<A>)global()).merge(aggregator);
+                }
+            }
+            return global().aggregatePartition(aggregator);
+        } else {
+            return aggregatePartition(aggregator);
+        }
+    }
+
+    // if re-partitioning is involved, does a per-partition reduce before emitting the results downstream
+    private Stream<T> combine(Reducer<T> reducer) {
+        if (shouldPartition()) {
+            if (node instanceof ProcessorNode) {
+                if (node.isWindowed()) {
+                    return reducePartition(reducer).global().reducePartition(reducer);
+                }
+            } else if (node instanceof WindowNode) {
+                for (Node p : node.getParents()) {
+                    if (p.isWindowed()) {
+                        Node localReduceNode = makeProcessorNode(new ReduceProcessor<>(reducer), VALUE);
+                        streamBuilder.insert(p, localReduceNode);
+                    }
+                }
+            }
+            return global().reducePartition(reducer);
+        } else {
+            return reducePartition(reducer);
+        }
+    }
+
 }