You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/12/22 22:07:08 UTC
[incubator-heron] branch master updated: Add sum/min/max reducers
(#3132)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 738ba82 Add sum/min/max reducers (#3132)
738ba82 is described below
commit 738ba82015a30c25cfb19b985533eb25b85439f3
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Sat Dec 22 14:07:03 2018 -0800
Add sum/min/max reducers (#3132)
* Add sum/min/max reducers
---
.../streamlet/WindowedWordCountTopology.java | 3 +-
.../apache/heron/streamlet/StreamletReducers.java | 85 ++++++++++++++++++++++
.../streamlets/CountByKeyAndWindowStreamlet.java | 3 +-
.../impl/streamlets/CountByKeyStreamlet.java | 3 +-
.../heron/streamlet/StreamletReducersTest.java | 58 +++++++++++++++
.../heron/streamlet/impl/StreamletImplTest.java | 5 +-
6 files changed, 152 insertions(+), 5 deletions(-)
diff --git a/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java b/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java
index bce855c..7805a7e 100644
--- a/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java
+++ b/examples/src/java/org/apache/heron/examples/streamlet/WindowedWordCountTopology.java
@@ -28,6 +28,7 @@ import org.apache.heron.examples.streamlet.utils.StreamletUtils;
import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Runner;
+import org.apache.heron.streamlet.StreamletReducers;
import org.apache.heron.streamlet.WindowConfig;
/**
@@ -71,7 +72,7 @@ public final class WindowedWordCountTopology {
// Value extractor (the value is always 1)
word -> 1,
WindowConfig.TumblingCountWindow(50),
- (x, y) -> x + y
+ StreamletReducers::sum
)
.setName("reduce-operation")
// The final output is logged using a user-supplied format
diff --git a/heron/api/src/java/org/apache/heron/streamlet/StreamletReducers.java b/heron/api/src/java/org/apache/heron/streamlet/StreamletReducers.java
new file mode 100644
index 0000000..ce6e28d
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/StreamletReducers.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+/**
+ * This class contains a few standard reduces that can be used with
+ * Streamlet reduce functions such as reduceByKeyAndWindow.
+ * Example, assuming s is a Stringlet<T> object and each tuple has these functions:
+ * - Integer getKey() and
+ * - Double getValue()
+ * To get streams of sum, min and max of all values upto the current one:
+ * s.reduceByKey(T::getKey, T::getValue, StreamletReducers::sum);
+ * s.reduceByKey(T::getKey, T::getValue, StreamletReducers::min);
+ * s.reduceByKey(T::getKey, T::getValue, StreamletReducers::max);
+ */
+public final class StreamletReducers {
+ // This is a utility class and shouldn't have public constructor.
+ private StreamletReducers() {
+ }
+
+ public static Integer sum(Integer a, Integer b) {
+ return a + b;
+ }
+
+ public static Long sum(Long a, Long b) {
+ return a + b;
+ }
+
+ public static Float sum(Float a, Float b) {
+ return a + b;
+ }
+
+ public static Double sum(Double a, Double b) {
+ return a + b;
+ }
+
+ public static Integer max(Integer a, Integer b) {
+ return Math.max(a, b);
+ }
+
+ public static Long max(Long a, Long b) {
+ return Math.max(a, b);
+ }
+
+ public static Float max(Float a, Float b) {
+ return Math.max(a, b);
+ }
+
+ public static Double max(Double a, Double b) {
+ return Math.max(a, b);
+ }
+
+ public static Integer min(Integer a, Integer b) {
+ return Math.min(a, b);
+ }
+
+ public static Long min(Long a, Long b) {
+ return Math.min(a, b);
+ }
+
+ public static Float min(Float a, Float b) {
+ return Math.min(a, b);
+ }
+
+ public static Double min(Double a, Double b) {
+ return Math.min(a, b);
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
index aac859f..4a19e2a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.StreamletReducers;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
@@ -58,7 +59,7 @@ public class CountByKeyAndWindowStreamlet<R, K>
// Count is a special case of reduce operation. Hence ReduceByKeyAndWindowOperator
// is used here. Every tuple has a value of 1 and the reduce operation is a simple sum.
ReduceByKeyAndWindowOperator<R, K, Long> bolt =
- new ReduceByKeyAndWindowOperator<R, K, Long>(keyExtractor, x -> 1L, (c1, c2) -> c1 + c2);
+ new ReduceByKeyAndWindowOperator<R, K, Long>(keyExtractor, x -> 1L, StreamletReducers::sum);
windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(parent.getName(), parent.getStreamId(),
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
index 965d897..5c8328b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.SerializableFunction;
+import org.apache.heron.streamlet.StreamletReducers;
import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
@@ -53,7 +54,7 @@ public class CountByKeyStreamlet<R, K> extends KVStreamletImpl<K, Long> {
// Count is a special case of reduce operation. Hence ReduceByKeyAndWindowOperator
// is used here. Every tuple has a value of 1 and the reduce operation is a simple sum.
ReduceByKeyOperator<R, K, Long> bolt =
- new ReduceByKeyOperator<R, K, Long>(keyExtractor, x -> 1L, (c1, c2) -> c1 + c2);
+ new ReduceByKeyOperator<R, K, Long>(keyExtractor, x -> 1L, StreamletReducers::sum);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/StreamletReducersTest.java b/heron/api/tests/java/org/apache/heron/streamlet/StreamletReducersTest.java
new file mode 100644
index 0000000..9c3cb2a
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/StreamletReducersTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamletReducersTest {
+
+ @Test
+ public void testSum() {
+ Assert.assertEquals(StreamletReducers.sum(1, 2), (Integer) 3);
+ Assert.assertEquals(StreamletReducers.sum(1L, 2L), (Long) 3L);
+ Assert.assertEquals(StreamletReducers.sum(1.0f, 2.0f), (Float) 3.0f);
+ Assert.assertEquals(StreamletReducers.sum(1.0, 2.0), (Double) 3.0);
+ }
+
+ @Test
+ public void testMax() {
+ Assert.assertEquals(StreamletReducers.max(1, 2), (Integer) 2);
+ Assert.assertEquals(StreamletReducers.max(2, 1), (Integer) 2);
+ Assert.assertEquals(StreamletReducers.max(1L, 2L), (Long) 2L);
+ Assert.assertEquals(StreamletReducers.max(2L, 1L), (Long) 2L);
+ Assert.assertEquals(StreamletReducers.max(1.0f, 2.0f), (Float) 2.0f);
+ Assert.assertEquals(StreamletReducers.max(2.0f, 1.0f), (Float) 2.0f);
+ Assert.assertEquals(StreamletReducers.max(1.0, 2.0), (Double) 2.0);
+ Assert.assertEquals(StreamletReducers.max(2.0, 1.0), (Double) 2.0);
+ }
+
+ @Test
+ public void testMin() {
+ Assert.assertEquals(StreamletReducers.min(1, 2), (Integer) 1);
+ Assert.assertEquals(StreamletReducers.min(2, 1), (Integer) 1);
+ Assert.assertEquals(StreamletReducers.min(1L, 2L), (Long) 1L);
+ Assert.assertEquals(StreamletReducers.min(2L, 1L), (Long) 1L);
+ Assert.assertEquals(StreamletReducers.min(1.0f, 2.0f), (Float) 1.0f);
+ Assert.assertEquals(StreamletReducers.min(2.0f, 1.0f), (Float) 1.0f);
+ Assert.assertEquals(StreamletReducers.min(1.0, 2.0), (Double) 1.0);
+ Assert.assertEquals(StreamletReducers.min(2.0, 1.0), (Double) 1.0);
+ }
+}
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index fa3aff7..fdb860c 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -51,6 +51,7 @@ import org.apache.heron.streamlet.SerializablePredicate;
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.StreamletReducers;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
import org.apache.heron.streamlet.impl.streamlets.CountByKeyAndWindowStreamlet;
@@ -380,7 +381,7 @@ public class StreamletImplTest {
KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20)
.<String, Double>reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
x -> x,
- (x, y) -> x + y); // A sum operation
+ StreamletReducers::sum);
assertTrue(streamlet instanceof ReduceByKeyStreamlet);
ReduceByKeyStreamlet<Double, String, Double> mStreamlet =
@@ -398,7 +399,7 @@ public class StreamletImplTest {
KVStreamlet<String, Double> streamlet = baseStreamlet.setNumPartitions(20)
.reduceByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
0.0,
- (x, y) -> x + y); // A sum operation
+ StreamletReducers::sum);
assertTrue(streamlet instanceof GeneralReduceByKeyStreamlet);
GeneralReduceByKeyStreamlet<Double, String, Double> mStreamlet =