You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/12/22 22:07:08 UTC

[incubator-heron] branch master updated: Add sum/min/max reducers (#3132)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 738ba82  Add sum/min/max reducers (#3132)
738ba82 is described below

commit 738ba82015a30c25cfb19b985533eb25b85439f3
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Sat Dec 22 14:07:03 2018 -0800

    Add sum/min/max reducers (#3132)
    
    * Add sum/min/max reducers
---
 .../streamlet/WindowedWordCountTopology.java       |  3 +-
 .../apache/heron/streamlet/StreamletReducers.java  | 85 ++++++++++++++++++++++
 .../streamlets/CountByKeyAndWindowStreamlet.java   |  3 +-
 .../impl/streamlets/CountByKeyStreamlet.java       |  3 +-
 .../heron/streamlet/StreamletReducersTest.java     | 58 +++++++++++++++
 .../heron/streamlet/impl/StreamletImplTest.java    |  5 +-
 6 files changed, 152 insertions(+), 5 deletions(-)

diff --git a/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java b/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java
index bce855c..7805a7e 100644
--- a/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java
+++ b/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java
@@ -28,6 +28,7 @@ import org.apache.heron.examples.streamlet.utils.StreamletUtils;
 import org.apache.heron.streamlet.Builder;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Runner;
+import org.apache.heron.streamlet.StreamletReducers;
 import org.apache.heron.streamlet.WindowConfig;
 
 /**
@@ -71,7 +72,7 @@ public final class WindowedWordCountTopology {
             // Value extractor (the value is always 1)
             word -> 1,
             WindowConfig.TumblingCountWindow(50),
-            (x, y) -> x + y
+            StreamletReducers::sum
         )
         .setName("reduce-operation")
         // The final output is logged using a user-supplied format
diff --git a/heron/api/src/java/org/apache/heron/streamlet/StreamletReducers.java b/heron/api/src/java/org/apache/heron/streamlet/StreamletReducers.java
new file mode 100644
index 0000000..ce6e28d
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/StreamletReducers.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+/**
+ * This class contains a few standard reduces that can be used with
+ * Streamlet reduce functions such as reduceByKeyAndWindow.
+ * Example, assuming s is a Stringlet<T> object and each tuple has these functions:
+ *   - Integer getKey() and
+ *   - Double getValue()
+ * To get streams of sum, min and max of all values upto the current one:
+ *   s.reduceByKey(T::getKey, T::getValue, StreamletReducers::sum);
+ *   s.reduceByKey(T::getKey, T::getValue, StreamletReducers::min);
+ *   s.reduceByKey(T::getKey, T::getValue, StreamletReducers::max);
+ */
+public final class StreamletReducers {
+  // This is a utility class and shouldn't have public constructor.
+  private StreamletReducers() {
+  }
+
+  public static Integer sum(Integer a, Integer b) {
+    return a + b;
+  }
+
+  public static Long sum(Long a, Long b) {
+    return a + b;
+  }
+
+  public static Float sum(Float a, Float b) {
+    return a + b;
+  }
+
+  public static Double sum(Double a, Double b) {
+    return a + b;
+  }
+
+  public static Integer max(Integer a, Integer b) {
+    return Math.max(a, b);
+  }
+
+  public static Long max(Long a, Long b) {
+    return Math.max(a, b);
+  }
+
+  public static Float max(Float a, Float b) {
+    return Math.max(a, b);
+  }
+
+  public static Double max(Double a, Double b) {
+    return Math.max(a, b);
+  }
+
+  public static Integer min(Integer a, Integer b) {
+    return Math.min(a, b);
+  }
+
+  public static Long min(Long a, Long b) {
+    return Math.min(a, b);
+  }
+
+  public static Float min(Float a, Float b) {
+    return Math.min(a, b);
+  }
+
+  public static Double min(Double a, Double b) {
+    return Math.min(a, b);
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
index aac859f..4a19e2a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.StreamletReducers;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
@@ -58,7 +59,7 @@ public class CountByKeyAndWindowStreamlet<R, K>
     // Count is a special case of reduce operation. Hence ReduceByKeyAndWindowOperator
     // is used here. Every tuple has a value of 1 and the reduce operation is a simple sum.
     ReduceByKeyAndWindowOperator<R, K, Long> bolt =
-        new ReduceByKeyAndWindowOperator<R, K, Long>(keyExtractor, x -> 1L, (c1, c2) -> c1 + c2);
+        new ReduceByKeyAndWindowOperator<R, K, Long>(keyExtractor, x -> 1L, StreamletReducers::sum);
     windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(parent.getName(), parent.getStreamId(),
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
index 965d897..5c8328b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.StreamletReducers;
 import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
@@ -53,7 +54,7 @@ public class CountByKeyStreamlet<R, K> extends KVStreamletImpl<K, Long> {
     // Count is a special case of reduce operation. Hence ReduceByKeyAndWindowOperator
     // is used here. Every tuple has a value of 1 and the reduce operation is a simple sum.
     ReduceByKeyOperator<R, K, Long> bolt =
-        new ReduceByKeyOperator<R, K, Long>(keyExtractor, x -> 1L, (c1, c2) -> c1 + c2);
+        new ReduceByKeyOperator<R, K, Long>(keyExtractor, x -> 1L, StreamletReducers::sum);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(parent.getName(), parent.getStreamId(),
             new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/StreamletReducersTest.java b/heron/api/tests/java/org/apache/heron/streamlet/StreamletReducersTest.java
new file mode 100644
index 0000000..9c3cb2a
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/StreamletReducersTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamletReducersTest {
+
+  @Test
+  public void testSum() {
+    Assert.assertEquals(StreamletReducers.sum(1, 2), (Integer) 3);
+    Assert.assertEquals(StreamletReducers.sum(1L, 2L), (Long) 3L);
+    Assert.assertEquals(StreamletReducers.sum(1.0f, 2.0f), (Float) 3.0f);
+    Assert.assertEquals(StreamletReducers.sum(1.0, 2.0), (Double) 3.0);
+  }
+
+  @Test
+  public void testMax() {
+    Assert.assertEquals(StreamletReducers.max(1, 2), (Integer) 2);
+    Assert.assertEquals(StreamletReducers.max(2, 1), (Integer) 2);
+    Assert.assertEquals(StreamletReducers.max(1L, 2L), (Long) 2L);
+    Assert.assertEquals(StreamletReducers.max(2L, 1L), (Long) 2L);
+    Assert.assertEquals(StreamletReducers.max(1.0f, 2.0f), (Float) 2.0f);
+    Assert.assertEquals(StreamletReducers.max(2.0f, 1.0f), (Float) 2.0f);
+    Assert.assertEquals(StreamletReducers.max(1.0, 2.0), (Double) 2.0);
+    Assert.assertEquals(StreamletReducers.max(2.0, 1.0), (Double) 2.0);
+  }
+
+  @Test
+  public void testMin() {
+    Assert.assertEquals(StreamletReducers.min(1, 2), (Integer) 1);
+    Assert.assertEquals(StreamletReducers.min(2, 1), (Integer) 1);
+    Assert.assertEquals(StreamletReducers.min(1L, 2L), (Long) 1L);
+    Assert.assertEquals(StreamletReducers.min(2L, 1L), (Long) 1L);
+    Assert.assertEquals(StreamletReducers.min(1.0f, 2.0f), (Float) 1.0f);
+    Assert.assertEquals(StreamletReducers.min(2.0f, 1.0f), (Float) 1.0f);
+    Assert.assertEquals(StreamletReducers.min(1.0, 2.0), (Double) 1.0);
+    Assert.assertEquals(StreamletReducers.min(2.0, 1.0), (Double) 1.0);
+  }
+}
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index fa3aff7..fdb860c 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -51,6 +51,7 @@ import org.apache.heron.streamlet.SerializablePredicate;
 import org.apache.heron.streamlet.SerializableTransformer;
 import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.StreamletReducers;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet;
@@ -380,7 +381,7 @@ public class StreamletImplTest {
     KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20)
         .<String, Double>reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
             x -> x,
-            (x, y) -> x + y);  // A sum operation
+            StreamletReducers::sum);
 
     assertTrue(streamlet instanceof ReduceByKeyStreamlet);
     ReduceByKeyStreamlet<Double, String, Double> mStreamlet =
@@ -398,7 +399,7 @@ public class StreamletImplTest {
     KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20)
         .reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
             0.0,
-            (x, y) -> x + y);  // A sum operation
+            StreamletReducers::sum);
 
     assertTrue(streamlet instanceof GeneralReduceByKeyStreamlet);
     GeneralReduceByKeyStreamlet<Double, String, Double> mStreamlet =