You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/07 01:28:23 UTC

[10/12] storm git commit: [STORM-1961] Addressed review comments and a few other refactoring

[STORM-1961] Addressed review comments and a few other refactoring


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/37ee12cc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/37ee12cc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/37ee12cc

Branch: refs/heads/master
Commit: 37ee12ccd9a0465dc6cd6d2f9fb59929a798b6b5
Parents: a07039f
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Jan 24 17:35:32 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue Jan 24 22:25:21 2017 +0530

----------------------------------------------------------------------
 .../storm/starter/streams/AggregateExample.java | 17 ++++----
 .../storm/starter/streams/BranchExample.java    |  9 ++--
 .../streams/GroupByKeyAndWindowExample.java     | 23 ++++++----
 .../storm/starter/streams/JoinExample.java      |  9 ++--
 .../starter/streams/StateQueryExample.java      | 16 ++++---
 .../starter/streams/StatefulWordCount.java      | 15 ++++---
 .../starter/streams/TypedTupleExample.java      |  9 ++--
 .../starter/streams/WindowedWordCount.java      |  9 ++--
 .../storm/starter/streams/WordCountToBolt.java  |  9 ++--
 .../src/jvm/org/apache/storm/streams/Pair.java  | 28 +++++++-----
 .../org/apache/storm/streams/PairStream.java    |  3 +-
 .../jvm/org/apache/storm/streams/Stream.java    |  5 ++-
 .../streams/operations/aggregators/LongSum.java | 45 ++++++++++++++++++++
 .../streams/operations/aggregators/Sum.java     | 45 --------------------
 .../processors/AggregateByKeyProcessor.java     |  5 ++-
 .../streams/processors/AggregateProcessor.java  |  4 +-
 .../storm/streams/processors/BaseProcessor.java |  8 ++--
 .../MergeAggregateByKeyProcessor.java           |  2 +-
 .../processors/MergeAggregateProcessor.java     |  2 +-
 .../storm/streams/processors/Processor.java     |  2 +-
 .../streams/processors/ProcessorContext.java    |  4 +-
 .../processors/ReduceByKeyProcessor.java        |  6 +--
 .../streams/processors/ReduceProcessor.java     |  2 +-
 .../apache/storm/streams/ProcessorBoltTest.java |  4 +-
 24 files changed, 149 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
index 91dfadb..7467634 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
@@ -60,11 +60,10 @@ public class AggregateExample {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 
@@ -76,21 +75,21 @@ public class AggregateExample {
 
         @Override
         public Pair<Integer, Integer> apply(Pair<Integer, Integer> sumAndCount, Integer value) {
-            return Pair.of(sumAndCount.getFirst() + value, sumAndCount.getSecond() + 1);
+            return Pair.of(sumAndCount._1 + value, sumAndCount._2 + 1);
         }
 
         @Override
         public Pair<Integer, Integer> merge(Pair<Integer, Integer> sumAndCount1, Pair<Integer, Integer> sumAndCount2) {
             System.out.println("Merge " + sumAndCount1 + " and " + sumAndCount2);
             return Pair.of(
-                    sumAndCount1.getFirst() + sumAndCount2.getFirst(),
-                    sumAndCount1.getSecond() + sumAndCount2.getSecond()
+                    sumAndCount1._1 + sumAndCount2._1,
+                    sumAndCount1._2 + sumAndCount2._2
             );
         }
 
         @Override
         public Double result(Pair<Integer, Integer> sumAndCount) {
-            return (double) sumAndCount.getFirst()/sumAndCount.getSecond();
+            return (double) sumAndCount._1/sumAndCount._2;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
index f5400a5..027b432 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
@@ -61,11 +61,10 @@ public class BranchExample {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
index 6b505bd..dd7e97f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
@@ -52,6 +52,9 @@ public class GroupByKeyAndWindowExample {
                 /*
                  * The elements having the same key within the window will be grouped
                  * together and the corresponding values will be merged.
+                 *
+                 * The result is a PairStream<String, Iterable<Double>> with
+                 * 'stock symbol' as the key and 'stock prices' for that symbol within the window as the value.
                  */
                 .groupByKeyAndWindow(SlidingWindows.of(Count.of(6), Count.of(3)))
                 .print();
@@ -61,8 +64,11 @@ public class GroupByKeyAndWindowExample {
                 /*
                  * The elements having the same key within the window will be grouped
                  * together and their values will be reduced using the given reduce function.
+                 *
+                 * Here the result is a PairStream<String, Double> with
+                 * 'stock symbol' as the key and the maximum price for that symbol within the window as the value.
                  */
-                .reduceByKeyAndWindow((x, y) -> (x + y) / 2.0, SlidingWindows.of(Count.of(6), Count.of(3)))
+                .reduceByKeyAndWindow((x, y) -> x > y ? x : y, SlidingWindows.of(Count.of(6), Count.of(3)))
                 .print();
 
         Config config = new Config();
@@ -70,19 +76,18 @@ public class GroupByKeyAndWindowExample {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 
     private static class StockQuotes extends BaseRichSpout {
         private final List<List<Values>> values = Arrays.asList(
                 Arrays.asList(new Values("AAPL", 100.0), new Values("GOOG", 780.0), new Values("FB", 125.0)),
-                Arrays.asList(new Values("AAPL", 105.0), new Values("GOOG", 790.0), new Values("FB", 130.0))
+                Arrays.asList(new Values("AAPL", 105.0), new Values("GOOG", 790.0), new Values("FB", 130.0)),
+                Arrays.asList(new Values("AAPL", 102.0), new Values("GOOG", 788.0), new Values("FB", 128.0))
         );
         private SpoutOutputCollector collector;
         private int index = 0;
@@ -103,7 +108,7 @@ public class GroupByKeyAndWindowExample {
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("key", "val"));
+            declarer.declare(new Fields("symbol", "price"));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
index 0b15615..4aa6253 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
@@ -71,11 +71,10 @@ public class JoinExample {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
index 6d6a4b3..ab6cac3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
@@ -54,7 +54,14 @@ public class StateQueryExample {
     public static void main(String[] args) throws Exception {
         StreamBuilder builder = new StreamBuilder();
         StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2)
+                /*
+                 * Transform the stream of words to a stream of (word, 1) pairs
+                 */
                 .mapToPair(w -> Pair.of(w, 1))
+                /*
+                 *  Update the count in the state. Here the first argument 0L is the initial value for the count and
+                 *  the second argument is a function that increments the count for each value received.
+                 */
                 .updateStateByKey(0L, (count, val) -> count + 1);
 
         /*
@@ -77,11 +84,10 @@ public class StateQueryExample {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
index ce7470d..ddd318a 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
@@ -60,9 +60,11 @@ public class StatefulWordCount {
                  */
                 .countByKey()
                 /*
-                 * update the word counts in the state
+                 * update the word counts in the state.
+                 * Here the first argument 0L is the initial value for the state
+                 * and the second argument is a function that adds the count to the current value in the state.
                  */
-                .updateStateByKey(0L, (x, y) -> x + y)
+                .updateStateByKey(0L, (state, count) -> state + count)
                  /*
                   * convert the state back to a stream and print the results
                   */
@@ -77,11 +79,10 @@ public class StatefulWordCount {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
index 193ad661..11e89bf 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
@@ -54,11 +54,10 @@ public class TypedTupleExample {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
index 0765a74..0f30b7c 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
@@ -70,11 +70,10 @@ public class WindowedWordCount {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
index dd7923a..1c0aae1 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
@@ -71,11 +71,10 @@ public class WordCountToBolt {
             config.setNumWorkers(1);
             StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
         } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.build());
-            Utils.sleep(60000);
-            cluster.killTopology("test");
-            cluster.shutdown();
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
+                Utils.sleep(60_000);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/Pair.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Pair.java b/storm-core/src/jvm/org/apache/storm/streams/Pair.java
index e5eb792..26d53b7 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Pair.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Pair.java
@@ -26,8 +26,14 @@ import java.io.Serializable;
  * @param <T2> the type of the second value
  */
 public final class Pair<T1, T2> implements Serializable {
-    private final T1 first;
-    private final T2 second;
+    /**
+     * The first value
+     */
+    public final T1 _1;
+    /**
+     * The second value
+     */
+    public final T2 _2;
 
     /**
      * Constructs a new pair of values
@@ -36,8 +42,8 @@ public final class Pair<T1, T2> implements Serializable {
      * @param second the second value
      */
     private Pair(T1 first, T2 second) {
-        this.first = first;
-        this.second = second;
+        _1 = first;
+        _2 = second;
     }
 
     /**
@@ -46,7 +52,7 @@ public final class Pair<T1, T2> implements Serializable {
      * @return the first value
      */
     public T1 getFirst() {
-        return first;
+        return _1;
     }
 
     /**
@@ -55,7 +61,7 @@ public final class Pair<T1, T2> implements Serializable {
      * @return the second value
      */
     public T2 getSecond() {
-        return second;
+        return _2;
     }
 
     /**
@@ -78,20 +84,20 @@ public final class Pair<T1, T2> implements Serializable {
 
         Pair<?, ?> pair = (Pair<?, ?>) o;
 
-        if (first != null ? !first.equals(pair.first) : pair.first != null) return false;
-        return second != null ? second.equals(pair.second) : pair.second == null;
+        if (_1 != null ? !_1.equals(pair._1) : pair._1 != null) return false;
+        return _2 != null ? _2.equals(pair._2) : pair._2 == null;
 
     }
 
     @Override
     public int hashCode() {
-        int result = first != null ? first.hashCode() : 0;
-        result = 31 * result + (second != null ? second.hashCode() : 0);
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
-        return "(" + first + ", " + second + ')';
+        return "(" + _1 + ", " + _2 + ')';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
index 3c08a05..da332a5 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
@@ -376,7 +376,8 @@ public class PairStream<K, V> extends Stream<Pair<K, V>> {
      * @return the {@link StreamState} which can be used to query the state
      */
     public <R> StreamState<K, R> updateStateByKey(StateUpdater<? super V, ? extends R> stateUpdater) {
-        return partitionByKey().updateStateByKeyPartition(stateUpdater);
+        // repartition so that state query fields grouping works correctly. this can be optimized further
+        return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
     }
 
     private <R> StreamState<K, R> updateStateByKeyPartition(StateUpdater<? super V, ? extends R> stateUpdater) {

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Stream.java b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
index 087e760..272c954 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/Stream.java
@@ -278,8 +278,9 @@ public class Stream<T> {
      * Returns an array of streams by splitting the given stream into multiple branches based on the given
      * predicates. The predicates are applied in the given order to the values of this stream and the result
      * is forwarded to the corresponding (index based) result stream based on the (index of) predicate that matches.
-     * If none of the predicates match a value, that value is dropped.
-     *
+     * <p>
+     * <b>Note:</b> If none of the predicates match a value, that value is dropped.
+     * </p>
      * @param predicates the predicates
      * @return an array of result streams (branches) corresponding to the given predicates
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/LongSum.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/LongSum.java b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/LongSum.java
new file mode 100644
index 0000000..afc7ee7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/LongSum.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations.aggregators;
+
+import org.apache.storm.streams.operations.CombinerAggregator;
+
+/**
+ * Computes the long sum of the input values
+ */
+public class LongSum implements CombinerAggregator<Number, Long, Long> {
+    @Override
+    public Long init() {
+        return 0L;
+    }
+
+    @Override
+    public Long apply(Long aggregate, Number value) {
+        return value.longValue() + aggregate;
+    }
+
+    @Override
+    public Long merge(Long accum1, Long accum2) {
+        return accum1 + accum2;
+    }
+
+    @Override
+    public Long result(Long accum) {
+        return accum;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
deleted file mode 100644
index df11d99..0000000
--- a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.streams.operations.aggregators;
-
-import org.apache.storm.streams.operations.CombinerAggregator;
-
-/**
- * Computes the long sum of the input values
- */
-public class Sum implements CombinerAggregator<Number, Long, Long> {
-    @Override
-    public Long init() {
-        return 0L;
-    }
-
-    @Override
-    public Long apply(Long aggregate, Number value) {
-        return value.longValue() + aggregate;
-    }
-
-    @Override
-    public Long merge(Long accum1, Long accum2) {
-        return accum1 + accum2;
-    }
-
-    @Override
-    public Long result(Long accum) {
-        return accum;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
index c10113b..3a7c812 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
@@ -22,6 +22,7 @@ import org.apache.storm.streams.operations.CombinerAggregator;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public class AggregateByKeyProcessor<K, V, A, R> extends BaseProcessor<Pair<K, V>> implements BatchProcessor {
     private final CombinerAggregator<V, A, R> aggregator;
@@ -47,9 +48,9 @@ public class AggregateByKeyProcessor<K, V, A, R> extends BaseProcessor<Pair<K, V
         }
         state.put(key, aggregator.apply(accumulator, val));
         if (emitAggregate) {
-            mayBeForwardAggUpdate(Pair.of(key, state.get(key)));
+            mayBeForwardAggUpdate(() -> Pair.of(key, state.get(key)));
         } else {
-            mayBeForwardAggUpdate(Pair.of(key, aggregator.result(state.get(key))));
+            mayBeForwardAggUpdate(() -> Pair.of(key, aggregator.result(state.get(key))));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
index d169345..d8b78ad 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
@@ -40,9 +40,9 @@ public class AggregateProcessor<T, A, R> extends BaseProcessor<T> implements Bat
         }
         state = aggregator.apply(state, input);
         if (emitAggregate) {
-            mayBeForwardAggUpdate(state);
+            mayBeForwardAggUpdate(() -> state);
         } else {
-            mayBeForwardAggUpdate(aggregator.result(state));
+            mayBeForwardAggUpdate(() -> aggregator.result(state));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/BaseProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/BaseProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/BaseProcessor.java
index 3ea469c..8413a2a 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/BaseProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/BaseProcessor.java
@@ -19,6 +19,8 @@ package org.apache.storm.streams.processors;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.storm.streams.WindowNode.PUNCTUATION;
 
@@ -91,12 +93,12 @@ abstract class BaseProcessor<T> implements Processor<T> {
      * can use this to emit the partial results on each input
      * if they are operating in non-windowed mode.
      *
-     * @param result the result
+     * @param result the result function
      * @param <R>    the result type
      */
-    protected final <R> void mayBeForwardAggUpdate(R result) {
+    protected final <R> void mayBeForwardAggUpdate(Supplier<R> result) {
         if (!context.isWindowed()) {
-            context.forward(result);
+            context.forward(result.get());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
index 57ad845..a66c88a 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
@@ -40,7 +40,7 @@ public class MergeAggregateByKeyProcessor<K, V, A, R> extends BaseProcessor<Pair
             accumulator = aggregator.init();
         }
         state.put(key, aggregator.merge(accumulator, val));
-        mayBeForwardAggUpdate(Pair.of(key, aggregator.result(state.get(key))));
+        mayBeForwardAggUpdate(() -> Pair.of(key, aggregator.result(state.get(key))));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
index 61b555b..9c5b1f0 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
@@ -33,7 +33,7 @@ public class MergeAggregateProcessor<T, A, R> extends BaseProcessor<A> implement
             state = aggregator.init();
         }
         state = aggregator.merge(state, input);
-        mayBeForwardAggUpdate(aggregator.result(state));
+        mayBeForwardAggUpdate(() -> aggregator.result(state));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/Processor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/Processor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/Processor.java
index 3b4518d..83daf2a 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/Processor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/Processor.java
@@ -37,7 +37,7 @@ public interface Processor<T> extends Serializable {
     void init(ProcessorContext context);
 
     /**
-     * Executes some operation on the input and possibly emits some result.
+     * Executes some operations on the input and possibly emits some results.
      *
      * @param input    the input to be processed
      * @param streamId the source stream id from where the input is received

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/ProcessorContext.java b/storm-core/src/jvm/org/apache/storm/streams/processors/ProcessorContext.java
index a69a0a9..45f58a1 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/ProcessorContext.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/ProcessorContext.java
@@ -45,10 +45,10 @@ public interface ProcessorContext extends Serializable {
     <T> void forward(T input, String stream);
 
     /**
-     * Returns if the processing is in a windowed context and should wait for
+     * Returns true if the processing is in a windowed context and should wait for
      * punctuation before emitting results.
      *
-     * @return if this is a windowed context or not
+     * @return whether this is a windowed context or not
      */
     boolean isWindowed();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceByKeyProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceByKeyProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceByKeyProcessor.java
index eba0de6..f20c041 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceByKeyProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceByKeyProcessor.java
@@ -36,9 +36,9 @@ public class ReduceByKeyProcessor<K, V> extends BaseProcessor<Pair<K, V>> implem
         K key = input.getFirst();
         V val = input.getSecond();
         V agg = state.get(key);
-        agg = (agg == null) ? val : reducer.apply(agg, val);
-        state.put(key, agg);
-        mayBeForwardAggUpdate(Pair.of(key, agg));
+        final V res = (agg == null) ? val : reducer.apply(agg, val);
+        state.put(key, res);
+        mayBeForwardAggUpdate(() -> Pair.of(key, res));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
index 0b90fb9..93badf2 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
@@ -30,7 +30,7 @@ public class ReduceProcessor<T> extends BaseProcessor<T> implements BatchProcess
     @Override
     public void execute(T input) {
         agg = (agg == null) ? input : reducer.apply(agg, input);
-        mayBeForwardAggUpdate(agg);
+        mayBeForwardAggUpdate(() -> agg);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/37ee12cc/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
index aa877f9..9380714 100644
--- a/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
+++ b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
@@ -19,7 +19,7 @@ package org.apache.storm.streams;
 
 import com.google.common.collect.Multimap;
 import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.streams.operations.aggregators.Sum;
+import org.apache.storm.streams.operations.aggregators.LongSum;
 import org.apache.storm.streams.processors.AggregateProcessor;
 import org.apache.storm.streams.processors.FilterProcessor;
 import org.apache.storm.streams.processors.Processor;
@@ -89,7 +89,7 @@ public class ProcessorBoltTest {
 
     @Test
     public void testAggResultAndAck() throws Exception {
-        setUpProcessorBolt(new AggregateProcessor<>(new Sum()), Collections.singleton("inputstream"), true, null);
+        setUpProcessorBolt(new AggregateProcessor<>(new LongSum()), Collections.singleton("inputstream"), true, null);
         bolt.execute(mockTuple2);
         bolt.execute(mockTuple3);
         bolt.execute(punctuation);