You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:23 UTC

[1/5] flink git commit: [FLINK-2283] [streaming] Proper serialization of state in StreamGroupedFold and Reduce

Repository: flink
Updated Branches:
  refs/heads/master 4938ff09f -> e494c2795


[FLINK-2283] [streaming] Proper serialization of state in StreamGroupedFold and Reduce


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c414ea98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c414ea98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c414ea98

Branch: refs/heads/master
Commit: c414ea98ec6896fd9c011d421cb3e5c9587c8446
Parents: a76d963
Author: mbalassi <mb...@apache.org>
Authored: Mon Oct 5 22:31:29 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../api/common/state/StateCheckpointer.java     |  4 +-
 .../streaming/api/datastream/KeyedStream.java   |  7 +-
 .../api/operators/AbstractStreamOperator.java   |  1 -
 .../operators/AbstractUdfStreamOperator.java    |  3 +-
 .../api/operators/StreamGroupedFold.java        | 21 +++--
 .../api/operators/StreamGroupedReduce.java      | 26 ++++--
 .../streaming/api/state/KVMapCheckpointer.java  | 82 +++++++++++++++++++
 .../streaming/api/AggregationFunctionTest.java  | 28 +++----
 .../api/operators/StreamGroupedFoldTest.java    | 42 +++++-----
 .../api/operators/StreamGroupedReduceTest.java  | 86 ++++++++++----------
 .../flink/streaming/api/scala/KeyedStream.scala |  2 +-
 .../UdfStreamOperatorCheckpointingITCase.java   | 20 ++---
 12 files changed, 215 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
index 488e308..f373846 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
@@ -58,7 +58,7 @@ public interface StateCheckpointer<S, C extends Serializable> {
 	 * 
 	 * @return A snapshot of the operator state.
 	 */
-	public C snapshotState(S state, long checkpointId, long checkpointTimestamp);
+	C snapshotState(S state, long checkpointId, long checkpointTimestamp);
 
 	/**
 	 * Restores the operator states from a given snapshot. The restores state
@@ -69,5 +69,5 @@ public interface StateCheckpointer<S, C extends Serializable> {
 	 *            The state snapshot that needs to be restored.
 	 * @return The state corresponding to the snapshot.
 	 */
-	public S restoreState(C stateSnapshot);
+	S restoreState(C stateSnapshot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index f7c5b53..edb7981 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -186,7 +186,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
-		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector));
+		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
+				clean(reducer), keySelector, getType()));
 	}
 
 	/**
@@ -208,7 +209,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 				Utils.getCallLocationName(), true);
 
 		return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder),
-				keySelector, initialValue));
+				keySelector, initialValue, getType()));
 	}
 
 	/**
@@ -443,7 +444,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
-		StreamGroupedReduce<T> operator = new StreamGroupedReduce<>(clean(aggregate), keySelector);
+		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector, getType());
 		return transform("Keyed Aggregation", getType(), operator);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 77bd130..87041eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -35,7 +35,6 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	
 	protected transient StreamingRuntimeContext runtimeContext;
 
 	protected transient ExecutionConfig executionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index c0d71e9..dc9a152 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -56,8 +56,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 	private static final long serialVersionUID = 1L;
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
-	
-	
+
 	/** the user function */
 	protected final F userFunction;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index ad40b85..732630a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -30,13 +30,15 @@ import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.state.KVMapCheckpointer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
 public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+
 		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
@@ -50,13 +52,18 @@ public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, F
 	private TypeSerializer<OUT> outTypeSerializer;
 	private transient OUT initialValue;
 
-	public StreamGroupedFold(
-			FoldFunction<IN, OUT> folder,
-			KeySelector<IN, ?> keySelector,
-			OUT initialValue) {
+	// Store the typeinfo, create serializer during runtime
+	private TypeInformation<Object> keyTypeInformation;
+
+	@SuppressWarnings("unchecked")
+	public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
+								OUT initialValue, TypeInformation<IN> inTypeInformation) {
 		super(folder);
 		this.keySelector = keySelector;
 		this.initialValue = initialValue;
+		keyTypeInformation = (TypeInformation<Object>) TypeExtractor
+				.getKeySelectorTypes(keySelector, inTypeInformation);
+
 	}
 
 	@Override
@@ -75,7 +82,9 @@ public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, F
 		initialValue = outTypeSerializer.deserialize(in);
 
 		values = runtimeContext.getOperatorState("flink_internal_fold_values",
-				new HashMap<Object, OUT>(), false);
+				new HashMap<Object, OUT>(), false,
+				new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
+						outTypeSerializer));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 68ebd8f..579814d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -17,33 +17,48 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import java.util.HashMap;
-
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.state.KVMapCheckpointer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.HashMap;
+
 public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
-		implements OneInputStreamOperator<IN, IN>{
+		implements OneInputStreamOperator<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 
 	private KeySelector<IN, ?> keySelector;
 	private transient OperatorState<HashMap<Object, IN>> values;
 
-	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
+	// Store the typeinfo, create serializer during runtime
+	private TypeInformation<Object> keyTypeInformation;
+	private TypeInformation<IN> valueTypeInformation;
+
+	@SuppressWarnings("unchecked")
+	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector,
+								TypeInformation<IN> typeInformation) {
 		super(reducer);
 		this.keySelector = keySelector;
+		valueTypeInformation = typeInformation;
+		keyTypeInformation = (TypeInformation<Object>) TypeExtractor
+				.getKeySelectorTypes(keySelector, typeInformation);
 	}
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
+
 		values = runtimeContext.getOperatorState("flink_internal_reduce_values",
-				new HashMap<Object, IN>(), false);
+				new HashMap<Object, IN>(), false,
+				new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
+						valueTypeInformation.createSerializer(executionConfig)));
 	}
 
 	@Override
@@ -67,4 +82,5 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 		output.emitWatermark(mark);
 	}
 
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
new file mode 100644
index 0000000..17cb6a0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of the {@link StateCheckpointer} interface for a map storing
+ * types compatible with Flink's serialization system.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class KVMapCheckpointer<K, V> implements StateCheckpointer<HashMap<K, V>, byte[]> {
+
+	private TypeSerializer<K> keySerializer;
+	private TypeSerializer<V> valueSerializer;
+
+	public KVMapCheckpointer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+	}
+
+	@Override
+	public byte[] snapshotState(HashMap<K, V> stateMap, long checkpointId, long checkpointTimestamp) {
+		ByteArrayOutputStream bos = new ByteArrayOutputStream(stateMap.size() * 16);
+		DataOutputView out = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
+		try {
+			out.writeInt(stateMap.size());
+			for (Map.Entry<K, V> kv : stateMap.entrySet()) {
+				keySerializer.serialize(kv.getKey(), out);
+				valueSerializer.serialize(kv.getValue(), out);
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Failed to write snapshot", e);
+		}
+		return bos.toByteArray();
+	}
+
+	@Override
+	public HashMap<K, V> restoreState(byte[] stateSnapshot) {
+		ByteArrayInputView in = new ByteArrayInputView(stateSnapshot);
+
+		HashMap<K, V> returnMap = new HashMap<>();
+		try {
+			int size = in.readInt();
+			for (int i = 0; i < size; i++) {
+				returnMap.put(keySerializer.deserialize(in), valueSerializer.deserialize(in));
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Failed to read snapshot", e);
+		}
+
+		return returnMap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index cdf7aae..e002780 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -92,15 +92,15 @@ public class AggregationFunctionTest {
 				1, typeInfo, AggregationType.MAX, config);
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(sumFunction, keySelector),
+				new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minFunction, keySelector),
+				new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxFunction, keySelector),
+				new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
 				getInputList());
 
 		assertEquals(expectedGroupSumList, groupedSumList);
@@ -156,13 +156,13 @@ public class AggregationFunctionTest {
 				false, config);
 
 		List<MyPojo> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(sumFunction, keySelector),
+				new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
 				getInputPojoList());
 		List<MyPojo> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minFunction, keySelector),
+				new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
 				getInputPojoList());
 		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxFunction, keySelector),
+				new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
 				getInputPojoList());
 
 		assertEquals(expectedGroupSumList, groupedSumList);
@@ -216,16 +216,16 @@ public class AggregationFunctionTest {
 				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
 				getInputByList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+				new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
 				getInputByList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+				new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
 				getInputByList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+				new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
 				getInputByList()));
 	}
 
@@ -274,16 +274,16 @@ public class AggregationFunctionTest {
 				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
 				getInputByPojoList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+				new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
 				getInputByPojoList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+				new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
 				getInputByPojoList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+				new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
 				getInputByPojoList()));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 82dddfe..bc5d614 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -56,12 +56,14 @@ public class StreamGroupedFoldTest {
 
 	}
 
+	private TypeInformation<Integer> inType = TypeExtractor.getForClass(Integer.class);
+	private TypeInformation<String> outType = TypeExtractor.getForClass(String.class);
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testGroupedFold() throws Exception {
-		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
 
-		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(
+		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(
 				new MyFolder(), new KeySelector<Integer, String>() {
 
 			private static final long serialVersionUID = 1L;
@@ -70,55 +72,55 @@ public class StreamGroupedFoldTest {
 			public String getKey(Integer value) throws Exception {
 				return value.toString();
 			}
-		}, "100");
+		}, "100", inType);
 
 		operator.setOutputType(outType, new ExecutionConfig());
 
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2));
+		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
 		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4));
-		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5));
+		testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
+		testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
+		testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
 
-		expectedOutput.add(new StreamRecord<String>("1001", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("10011", initialTime + 2));
+		expectedOutput.add(new StreamRecord<>("1001", initialTime + 1));
+		expectedOutput.add(new StreamRecord<>("10011", initialTime + 2));
 		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("1002", initialTime + 3));
-		expectedOutput.add(new StreamRecord<String>("10022", initialTime + 4));
-		expectedOutput.add(new StreamRecord<String>("1003", initialTime + 5));
+		expectedOutput.add(new StreamRecord<>("1002", initialTime + 3));
+		expectedOutput.add(new StreamRecord<>("10022", initialTime + 4));
+		expectedOutput.add(new StreamRecord<>("1003", initialTime + 5));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 	}
 
 	@Test
 	public void testOpenClose() throws Exception {
-		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
+		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
 			public Integer getKey(Integer value) throws Exception {
 				return value;
 			}
-		}, "init");
+		}, "init", inType);
 
 		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
 
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
 
 		long initialTime = 0L;
 
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime));
+		testHarness.processElement(new StreamRecord<>(1, initialTime));
+		testHarness.processElement(new StreamRecord<>(2, initialTime));
 
 		testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index a2cd1fd..85d9bc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -22,7 +22,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -43,71 +45,47 @@ import org.junit.Test;
 
 public class StreamGroupedReduceTest {
 
-	private static class MyReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testGroupedReduce() throws Exception {
-		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new MyReducer(), new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		});
+		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), new IntegerKeySelector(), typeInfo);
 
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2));
+		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
 		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4));
-		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5));
+		testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
+		testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
+		testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
 
-		expectedOutput.add(new StreamRecord<Integer>(1, initialTime + 1));
-		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
+		expectedOutput.add(new StreamRecord<>(1, initialTime + 1));
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 2));
 		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 3));
-		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
-		expectedOutput.add(new StreamRecord<Integer>(3, initialTime + 5));
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 3));
+		expectedOutput.add(new StreamRecord<>(4, initialTime + 4));
+		expectedOutput.add(new StreamRecord<>(3, initialTime + 5));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 	}
 
 	@Test
 	public void testOpenClose() throws Exception {
-		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new TestOpenCloseReduceFunction(), new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		});
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+		StreamGroupedReduce<Integer> operator =
+				new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), new IntegerKeySelector(), typeInfo);
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
 
 		long initialTime = 0L;
 
 		testHarness.open();
 
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime));
+		testHarness.processElement(new StreamRecord<>(1, initialTime));
+		testHarness.processElement(new StreamRecord<>(2, initialTime));
 
 		testHarness.close();
 
@@ -149,4 +127,28 @@ public class StreamGroupedReduceTest {
 			return in1 + in2;
 		}
 	}
+
+	// Utilities
+
+	private static class MyReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+	private static class IntegerKeySelector implements KeySelector<Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer getKey(Integer value) throws Exception {
+			return value;
+		}
+	}
+
+	private static TypeInformation<Integer> typeInfo = TypeExtractor.getForClass(Integer.class);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 18b71be..0ce36aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -273,7 +273,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
           javaStream.getExecutionConfig)
     }
 
-    val invokable =  new StreamGroupedReduce[T](reducer,javaStream.getKeySelector())
+    val invokable =  new StreamGroupedReduce[T](reducer,javaStream.getKeySelector(),getType())
      
     new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
       .asInstanceOf[DataStream[T]]

http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 263bef1..cb02d2f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -19,22 +19,20 @@
 package org.apache.flink.test.checkpointing;
 
 import com.google.common.collect.EvictingQueue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.util.Collector;
 import org.junit.Assert;
 
 import java.util.Queue;
@@ -47,7 +45,7 @@ import java.util.Random;
  *
  * <p>
  * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
- * operator.
+ * and the {@link StreamGroupedFold} operators.
  */
 @SuppressWarnings("serial")
 public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
@@ -63,8 +61,8 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 	public void testProgram(StreamExecutionEnvironment env) {
 
 		// base stream
-		GroupedDataStream<Tuple2<Integer, Long>> stream = env.addSource(new StatefulMultipleSequence())
-				.groupBy(0);
+		KeyedStream<Tuple2<Integer, Long>, Tuple> stream = env.addSource(new StatefulMultipleSequence())
+				.keyBy(0);
 
 
 		stream
@@ -72,7 +70,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 				.min(1)
 				// failure generation
 				.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
-				.groupBy(0)
+				.keyBy(0)
 				.addSink(new MinEvictingQueueSink());
 
 		stream
@@ -84,7 +82,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
 					}
 				})
-				.groupBy(0)
+				.keyBy(0)
 				.addSink(new SumEvictingQueueSink());
 
 		stream
@@ -96,7 +94,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 						return Tuple2.of(value.f0, accumulator.f1 + value.f1);
 					}
 				})
-				.groupBy(0)
+				.keyBy(0)
 				.addSink(new FoldEvictingQueueSink());
 	}
 


[2/5] flink git commit: [FLINK-2283] [streaming] Test for checkpointing in internal operators

Posted by mb...@apache.org.
[FLINK-2283] [streaming] Test for checkpointing in internal operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76d9638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76d9638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76d9638

Branch: refs/heads/master
Commit: a76d9638e83947d491e6ae1edb82f020b6ba8f54
Parents: 225704b
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 20 22:27:11 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../api/operators/StreamGroupedFold.java        |   1 -
 .../api/operators/StreamGroupedReduce.java      |   1 -
 .../UdfStreamOperatorCheckpointingITCase.java   | 267 +++++++++++++++++++
 3 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f8f167a..ad40b85 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -23,7 +23,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index e1f9f06..68ebd8f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;

http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
new file mode 100644
index 0000000..263bef1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Integration test ensuring that the persistent state defined by the implementations
+ * of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
+ * a failure.
+ *
+ * <p>
+ * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
+ * operator.
+ */
+@SuppressWarnings("serial")
+public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
+
+	final private static long NUM_INPUT = 2_500_000L;
+	final private static int NUM_OUTPUT = 1_000;
+
+	/**
+	 * Assembles a stream of a grouping field and some long data. Applies reduce functions
+	 * on this stream.
+	 */
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+
+		// base stream
+		GroupedDataStream<Tuple2<Integer, Long>> stream = env.addSource(new StatefulMultipleSequence())
+				.groupBy(0);
+
+
+		stream
+				// testing built-in aggregate
+				.min(1)
+				// failure generation
+				.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
+				.groupBy(0)
+				.addSink(new MinEvictingQueueSink());
+
+		stream
+				// testing UDF reducer
+				.reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
+					@Override
+					public Tuple2<Integer, Long> reduce(
+							Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.groupBy(0)
+				.addSink(new SumEvictingQueueSink());
+
+		stream
+				// testing UDF folder
+				.fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
+					@Override
+					public Tuple2<Integer, Long> fold(
+							Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
+						return Tuple2.of(value.f0, accumulator.f1 + value.f1);
+					}
+				})
+				.groupBy(0)
+				.addSink(new FoldEvictingQueueSink());
+	}
+
+	@Override
+	public void postSubmit() {
+
+		// Note that these checks depend on the ordering of the input
+
+		// Checking the result of the built-in aggregate
+		for (int i = 0; i < PARALLELISM; i++) {
+			for (Long value : MinEvictingQueueSink.queues[i]) {
+				Assert.assertTrue("Value different from 1 found, was " + value + ".", value == 1);
+			}
+		}
+
+		// Checking the result of the UDF reducer
+		for (int i = 0; i < PARALLELISM; i++) {
+			long prevCount = NUM_INPUT - NUM_OUTPUT;
+			long sum = prevCount * (prevCount + 1) / 2;
+			while (!SumEvictingQueueSink.queues[i].isEmpty()) {
+				sum += ++prevCount;
+				Long value = SumEvictingQueueSink.queues[i].remove();
+				Assert.assertTrue("Unexpected reduce value " + value + " instead of " + sum + ".", value == sum);
+			}
+		}
+
+		// Checking the result of the UDF folder
+		for (int i = 0; i < PARALLELISM; i++) {
+			long prevCount = NUM_INPUT - NUM_OUTPUT;
+			long sum = prevCount * (prevCount + 1) / 2;
+			while (!FoldEvictingQueueSink.queues[i].isEmpty()) {
+				sum += ++prevCount;
+				Long value = FoldEvictingQueueSink.queues[i].remove();
+				Assert.assertTrue("Unexpected fold value " + value + " instead of " + sum + ".", value == sum);
+			}
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Produces a sequence multiple times for each parallelism instance of downstream operators,
+	 * augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
+	 */
+	private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>{
+
+		private transient OperatorState<Long> count;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+			Object lock = ctx.getCheckpointLock();
+
+			while (count.value() < NUM_INPUT){
+				synchronized (lock){
+					for (int i = 0; i < PARALLELISM; i++) {
+						ctx.collect(Tuple2.of(i, count.value() + 1));
+					}
+					count.update(count.value() + 1);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+		}
+	}
+
+	/**
+	 * Mapper that causes one failure between seeing 40% to 70% of the records.
+	 */
+	private static class OnceFailingIdentityMapFunction
+			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private OperatorState<Long> count;
+
+		public OnceFailingIdentityMapFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		}
+
+		@Override
+		public Tuple2<Integer, Long> map(Tuple2<Integer, Long> value) throws Exception {
+			if (!hasFailed && count.value() >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+			count.update(count.value() + 1);
+			return value;
+		}
+
+	}
+
+	/**
+	 * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+	 * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+	 * parallel access of the queues.
+	 */
+	private static class MinEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+		public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			if (queues[value.f0] == null) {
+				queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+			}
+			queues[value.f0].add(value.f1);
+		}
+	}
+
+	/**
+	 * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+	 * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+	 * parallel access of the queues.
+	 */
+	private static class SumEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+		public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			if (queues[value.f0] == null) {
+				queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+			}
+			queues[value.f0].add(value.f1);
+		}
+	}
+
+	/**
+	 * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+	 * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+	 * parallel access of the queues.
+	 */
+	private static class FoldEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+		public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			if (queues[value.f0] == null) {
+				queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+			}
+			queues[value.f0].add(value.f1);
+		}
+	}
+}


[3/5] flink git commit: [streaming] Removed unused StreamReduce

Posted by mb...@apache.org.
[streaming] Removed unused StreamReduce

Refactored corresponding tests, some minor cleanups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/906bd6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/906bd6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/906bd6dc

Branch: refs/heads/master
Commit: 906bd6dcb360665af0331faddb34e7260bfe7f1a
Parents: 4938ff0
Author: mbalassi <mb...@apache.org>
Authored: Fri Sep 11 16:32:09 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/OperatorState.java   |  10 +-
 .../api/operators/StreamGroupedReduce.java      |  11 +-
 .../streaming/api/operators/StreamReduce.java   |  53 ---
 .../streaming/api/AggregationFunctionTest.java  | 446 ++++++++-----------
 .../flink/streaming/api/scala/DataStream.scala  |  26 +-
 5 files changed, 221 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 3f5e977..3036023 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
 import java.io.IOException;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 
 /**
  * Base interface for all streaming operator states. It can represent both
@@ -30,9 +32,9 @@ import org.apache.flink.api.common.functions.MapFunction;
  * State can be accessed and manipulated using the {@link #value()} and
  * {@link #update(T)} methods. These calls are only safe in the
  * transformation call the operator represents, for instance inside
- * {@link MapFunction#map()} and can lead tp unexpected behavior in the
- * {@link #open(org.apache.flink.configuration.Configuration)} or
- * {@link #close()} methods.
+ * {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
+ * {@link AbstractRichFunction#open(Configuration)} or
+ * {@link AbstractRichFunction#close()} methods.
  * 
  * @param <T>
  *            Type of the operator state
@@ -59,7 +61,7 @@ public interface OperatorState<T> {
 	 * partitioned state is updated with null, the state for the current key 
 	 * will be removed and the default value is returned on the next access.
 	 * 
-	 * @param state
+	 * @param value
 	 *            The new value for the state.
 	 *            
 	 * @throws IOException Thrown if the system cannot access the state.

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 7533c33..8805138 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,9 +22,11 @@ import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
+		implements OneInputStreamOperator<IN, IN>{
 
 	private static final long serialVersionUID = 1L;
 
@@ -41,7 +43,7 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 		Object key = keySelector.getKey(element.getValue());
 
 		if (values == null) {
-			values = new HashMap<Object, IN>();
+			values = new HashMap<>();
 		}
 
 		IN currentValue = values.get(key);
@@ -56,4 +58,9 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 		}
 	}
 
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
deleted file mode 100644
index af562fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
-		implements OneInputStreamOperator<IN, IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient IN currentValue;
-
-	public StreamReduce(ReduceFunction<IN> reducer) {
-		super(reducer);
-		currentValue = null;
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-
-		if (currentValue != null) {
-			currentValue = userFunction.reduce(currentValue, element.getValue());
-		} else {
-			currentValue = element.getValue();
-		}
-		output.collect(element.replace(currentValue));
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index ff04609..cdf7aae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -23,18 +23,19 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamReduce;
 import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
@@ -44,32 +45,16 @@ public class AggregationFunctionTest {
 	@Test
 	public void groupSumIntegerTest() {
 
-		List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Integer> expectedSumList0 = new ArrayList<Integer>();
-		List<Integer> expectedMinList0 = new ArrayList<Integer>();
-		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
-		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-
-		List<Integer> simpleInput = new ArrayList<Integer>();
+		// preparing expected outputs
+		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
+		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
+		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
 
 		int groupedSum0 = 0;
 		int groupedSum1 = 0;
 		int groupedSum2 = 0;
 
 		for (int i = 0; i < 9; i++) {
-			simpleInput.add(i);
-			expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
-			expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
-			expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
-			expectedSumList0.add((i + 1) * i / 2);
-			expectedMaxList0.add(i);
-			expectedMinList0.add(0);
-
 			int groupedSum;
 			switch (i % 3) {
 				case 0:
@@ -83,99 +68,59 @@ public class AggregationFunctionTest {
 					break;
 			}
 
-			expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
-			expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3));
-			expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+			expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
+			expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
+			expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
 		}
 
-		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(0, 0));
-		TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
+		// some necessary boiler plate
+		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
+				.getForObject(new Tuple2<>(0, 0));
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
-				new SumAggregator<Tuple2<Integer, Integer>>(1, type1, config);
-		ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
-		ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
-				1, type1, AggregationType.MIN, config);
-		ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2,
-				AggregationType.MIN, config);
-		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
-				1, type1, AggregationType.MAX, config);
-		ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2,
-				AggregationType.MAX, config);
-		List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(sumFunction), getInputList());
-
-		List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(minFunction), getInputList());
-
-		List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(maxFunction), getInputList());
-
-		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(1, 1));
-
 		KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo),
-				typeInfo, new ExecutionConfig());
+				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+				typeInfo, config);
+
+		// aggregations tested
+		ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
+				new SumAggregator<>(1, typeInfo, config);
+		ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
+				1, typeInfo, AggregationType.MIN, config);
+		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
+				1, typeInfo, AggregationType.MAX, config);
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<Tuple2<Integer, Integer>>(sumFunction, keySelector),
+				new StreamGroupedReduce<>(sumFunction, keySelector),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<Tuple2<Integer, Integer>>(minFunction, keySelector),
+				new StreamGroupedReduce<>(minFunction, keySelector),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<Tuple2<Integer, Integer>>(maxFunction, keySelector),
+				new StreamGroupedReduce<>(maxFunction, keySelector),
 				getInputList());
 
-		assertEquals(expectedSumList, sumList);
-		assertEquals(expectedMinList, minList);
-		assertEquals(expectedMaxList, maxList);
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(sumFunction0), simpleInput));
-		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(minFunction0), simpleInput));
-		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(maxFunction0), simpleInput));
-		
 	}
 
 	@Test
 	public void pojoGroupSumIntegerTest() {
-		List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
-		List<Integer> expectedSumList0 = new ArrayList<Integer>();
-		List<Integer> expectedMinList0 = new ArrayList<Integer>();
-		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
-		List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
-
-		List<Integer> simpleInput = new ArrayList<Integer>();
+
+		// preparing expected outputs
+		List<MyPojo> expectedGroupSumList = new ArrayList<>();
+		List<MyPojo> expectedGroupMinList = new ArrayList<>();
+		List<MyPojo> expectedGroupMaxList = new ArrayList<>();
 
 		int groupedSum0 = 0;
 		int groupedSum1 = 0;
 		int groupedSum2 = 0;
 
 		for (int i = 0; i < 9; i++) {
-			simpleInput.add(i);
-			expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
-			expectedMinList.add(new MyPojo(i % 3, 0));
-			expectedMaxList.add(new MyPojo(i % 3, i));
-
-			expectedSumList0.add((i + 1) * i / 2);
-			expectedMaxList0.add(i);
-			expectedMinList0.add(0);
-
 			int groupedSum;
 			switch (i % 3) {
 				case 0:
@@ -194,222 +139,188 @@ public class AggregationFunctionTest {
 			expectedGroupMaxList.add(new MyPojo(i % 3, i));
 		}
 
-		TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0));
-		TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
-		ExecutionConfig config = new ExecutionConfig();
+		// some necessary boiler plate
+		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
 
-		ReduceFunction<MyPojo> sumFunction = new SumAggregator<MyPojo>("f1", type1, config);
-		ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
-		ReduceFunction<MyPojo> minFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MIN,
-				false, config);
-		ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MIN,
-				config);
-		ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MAX,
-				false, config);
-		ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MAX,
-				config);
-
-		List<MyPojo> sumList = MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(sumFunction), getInputPojoList());
-		List<MyPojo> minList = MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(minFunction), getInputPojoList());
-		List<MyPojo> maxList = MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(maxFunction), getInputPojoList());
+		ExecutionConfig config = new ExecutionConfig();
 
-		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
 		KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo),
+				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
 				typeInfo, config);
 
+		// aggregations tested
+		ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
+		ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
+				false, config);
+		ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
+				false, config);
+
 		List<MyPojo> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<MyPojo>(sumFunction, keySelector),
+				new StreamGroupedReduce<>(sumFunction, keySelector),
 				getInputPojoList());
 		List<MyPojo> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<MyPojo>(minFunction, keySelector),
+				new StreamGroupedReduce<>(minFunction, keySelector),
 				getInputPojoList());
 		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<MyPojo>(maxFunction, keySelector),
+				new StreamGroupedReduce<>(maxFunction, keySelector),
 				getInputPojoList());
 
-		assertEquals(expectedSumList, sumList);
-		assertEquals(expectedMinList, minList);
-		assertEquals(expectedMaxList, maxList);
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(sumFunction0), simpleInput));
-		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(minFunction0), simpleInput));
-		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(maxFunction0), simpleInput));
 	}
-
+	
 	@Test
 	public void minMaxByTest() {
-		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(0, 0));
+		// Tuples are grouped on field 0, aggregated on field 1
+		
+		// preparing expected outputs
+		List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
+				Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
+				Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
+				Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
+
+		List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
+				Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
+				Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 5),
+				Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
+
+		List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
+				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
+
+		List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
+				Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
+				Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3),
+				Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6));
+
+		// some necessary boiler plate
+		TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
+				.getForObject(Tuple3.of(0,0,0));
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, true, config);
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, false, config);
-
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, true, config);
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, false, config);
-
-		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-
-		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
-
-		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-
-		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+		KeySelector<Tuple3<Integer, Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+				typeInfo, config);
+		
+		// aggregations tested
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, true, config);
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionLast =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, false, config);
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionFirst =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, true, config);
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionFirst),
-				getInputList()));
+				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+				getInputByList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionLast),
-				getInputList()));
+				new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+				getInputByList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionLast),
-				getInputList()));
+				new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+				getInputByList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionFirst),
-				getInputList()));
+				new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+				getInputByList()));
 	}
 
 	@Test
 	public void pojoMinMaxByTest() {
+		// Pojos are grouped on field 0, aggregated on field 1
+
+		// preparing expected outputs
+		List<MyPojo3> maxByFirstExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2),
+				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2));
+
+		List<MyPojo3> maxByLastExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 5),
+				new MyPojo3(2, 5), new MyPojo3(2, 5), new MyPojo3(2, 8));
+
+		List<MyPojo3> minByFirstExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0));
+
+		List<MyPojo3> minByLastExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+				new MyPojo3(0, 3), new MyPojo3(0, 3), new MyPojo3(0, 3),
+				new MyPojo3(0, 6), new MyPojo3(0, 6), new MyPojo3(0, 6));
+
+		// some necessary boiler plate
+		TypeInformation<MyPojo3> typeInfo = TypeExtractor.getForObject(new MyPojo3(0, 0));
+
 		ExecutionConfig config = new ExecutionConfig();
-		TypeInformation<MyPojo> type1 = TypeExtractor
-				.getForObject(new MyPojo(0, 0));
-
-		ReduceFunction<MyPojo> maxByFunctionFirst =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, true, config);
-		ReduceFunction<MyPojo> maxByFunctionLast =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, false, config);
-
-		ReduceFunction<MyPojo> minByFunctionFirst =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, true, config);
-		ReduceFunction<MyPojo> minByFunctionLast =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, false, config);
-
-		List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
-		maxByFirstExpected.add(new MyPojo(0, 0));
-		maxByFirstExpected.add(new MyPojo(1, 1));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-
-		List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
-		maxByLastExpected.add(new MyPojo(0, 0));
-		maxByLastExpected.add(new MyPojo(1, 1));
-		maxByLastExpected.add(new MyPojo(2, 2));
-		maxByLastExpected.add(new MyPojo(2, 2));
-		maxByLastExpected.add(new MyPojo(2, 2));
-		maxByLastExpected.add(new MyPojo(2, 5));
-		maxByLastExpected.add(new MyPojo(2, 5));
-		maxByLastExpected.add(new MyPojo(2, 5));
-		maxByLastExpected.add(new MyPojo(2, 8));
-
-		List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-
-		List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
-		minByLastExpected.add(new MyPojo(0, 0));
-		minByLastExpected.add(new MyPojo(0, 0));
-		minByLastExpected.add(new MyPojo(0, 0));
-		minByLastExpected.add(new MyPojo(0, 3));
-		minByLastExpected.add(new MyPojo(0, 3));
-		minByLastExpected.add(new MyPojo(0, 3));
-		minByLastExpected.add(new MyPojo(0, 6));
-		minByLastExpected.add(new MyPojo(0, 6));
-		minByLastExpected.add(new MyPojo(0, 6));
+
+		KeySelector<MyPojo3, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
+				typeInfo, config);
+
+		// aggregations tested
+		ReduceFunction<MyPojo3> maxByFunctionFirst =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, true, config);
+		ReduceFunction<MyPojo3> maxByFunctionLast =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, false, config);
+		ReduceFunction<MyPojo3> minByFunctionFirst =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, true, config);
+		ReduceFunction<MyPojo3> minByFunctionLast =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(maxByFunctionFirst),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+				getInputByPojoList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(maxByFunctionLast),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+				getInputByPojoList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(minByFunctionLast),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+				getInputByPojoList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(minByFunctionFirst),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+				getInputByPojoList()));
 	}
 
+	// *************************************************************************
+	//     UTILS
+	// *************************************************************************
+
 	private List<Tuple2<Integer, Integer>> getInputList() {
-		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
+		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<>();
 		for (int i = 0; i < 9; i++) {
-			inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+			inputList.add(Tuple2.of(i % 3, i));
 		}
 		return inputList;
-
 	}
 
 	private List<MyPojo> getInputPojoList() {
-		ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+		ArrayList<MyPojo> inputList = new ArrayList<>();
 		for (int i = 0; i < 9; i++) {
 			inputList.add(new MyPojo(i % 3, i));
 		}
 		return inputList;
+	}
 
+	private List<Tuple3<Integer, Integer, Integer>> getInputByList() {
+		ArrayList<Tuple3<Integer, Integer, Integer>> inputList = new ArrayList<>();
+		for (int i = 0; i < 9; i++) {
+			inputList.add(Tuple3.of(0, i % 3, i));
+		}
+		return inputList;
+	}
+
+	private List<MyPojo3> getInputByPojoList() {
+		ArrayList<MyPojo3> inputList = new ArrayList<>();
+		for (int i = 0; i < 9; i++) {
+			inputList.add(new MyPojo3(i % 3, i));
+		}
+		return inputList;
 	}
 
 	public static class MyPojo implements Serializable {
@@ -439,6 +350,39 @@ public class AggregationFunctionTest {
 				return false;
 			}
 		}
+	}
+
+	public static class MyPojo3 implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+		public int f0;
+		public int f1;
+		public int f2;
+
+		// Field 0 is always initialized to 0
+		public MyPojo3(int f1, int f2) {
+			this.f1 = f1;
+			this.f2 = f2;
+		}
+
+		public MyPojo3() {
+		}
+
+		@Override
+		public String toString() {
+			return "POJO3(" + f0 + "," + f1 + "," + f2 + ")";
+		}
+
+		@Override
+		public boolean equals(Object other) {
+			if (other instanceof MyPojo3) {
+				return this.f0 == ((MyPojo3) other).f0
+						&& this.f1 == ((MyPojo3) other).f1
+						&& this.f2 == ((MyPojo3) other).f2;
+			} else {
+				return false;
+			}
+		}
 
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0cf1df8..19bcb73 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,32 +18,28 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
-import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.scala.function.StatefulFunction
+import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
+import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
-import org.apache.flink.streaming.api.datastream.{KeyedStream => JavaKeyedStream}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 class DataStream[T](javaStream: JavaStream[T]) {
 


[5/5] flink git commit: [FLINK-2812] [streaming] KeySelectorUtil interacts well with type extraction

Posted by mb...@apache.org.
[FLINK-2812] [streaming] KeySelectorUtil interacts well with type extraction

The interaction is tested in the AggregationFunctionTest and the scala DataStreamTest amongst others.

Closes #1155


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e494c279
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e494c279
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e494c279

Branch: refs/heads/master
Commit: e494c2795252a7f3db3659b1919cc7c75fc3dbb9
Parents: c414ea9
Author: mbalassi <mb...@apache.org>
Authored: Mon Oct 5 22:19:53 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 17:10:27 2015 +0200

----------------------------------------------------------------------
 .../streaming/util/keys/KeySelectorUtil.java    | 20 +++++++++++++++-----
 .../flink/streaming/api/scala/DataStream.scala  | 10 +++++++---
 2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e494c279/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index d8839a0..9c76d95 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 /**
  * Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
@@ -47,12 +49,14 @@ public final class KeySelectorUtil {
 		
 		// use ascending order here, the code paths for that are usually a slight bit faster
 		boolean[] orders = new boolean[numKeyFields];
+		TypeInformation[] typeInfos = new TypeInformation[numKeyFields];
 		for (int i = 0; i < numKeyFields; i++) {
 			orders[i] = true;
+			typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
 		}
-		
+
 		TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
-		return new ComparableKeySelector<X>(comparator, numKeyFields);
+		return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
 	}
 
 	
@@ -70,7 +74,7 @@ public final class KeySelectorUtil {
 
 		TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
 				logicalKeyPositions, new boolean[1], 0, executionConfig);
-		return new OneKeySelector<X, K>(comparator);
+		return new OneKeySelector<>(comparator);
 	}
 
 	/**
@@ -111,21 +115,23 @@ public final class KeySelectorUtil {
 	 *
 	 * @param <IN> The type from which the key is extracted.
 	 */
-	public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
+	public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
 
 		private static final long serialVersionUID = 1L;
 
 		private final TypeComparator<IN> comparator;
 		private final int keyLength;
+		private final TupleTypeInfo tupleTypeInfo;
 
 		/** Reusable array to hold the key objects. Since this is initially empty (all positions
 		 * are null), it does not have any serialization problems */
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final Object[] keyArray;
 
-		public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength) {
+		public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo tupleTypeInfo) {
 			this.comparator = comparator;
 			this.keyLength = keyLength;
+			this.tupleTypeInfo = tupleTypeInfo;
 			keyArray = new Object[keyLength];
 		}
 
@@ -139,6 +145,10 @@ public final class KeySelectorUtil {
 			return key;
 		}
 
+		@Override
+		public TypeInformation<Tuple> getProducedType() {
+			return tupleTypeInfo;
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e494c279/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 19bcb73..8aeacb4 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
@@ -230,8 +231,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
 
     val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
+    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
+      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
     javaStream.keyBy(keyExtractor)
   }
@@ -256,8 +258,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
+    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
+      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
     javaStream.partitionByHash(keyExtractor)
   }
@@ -293,8 +296,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
   : DataStream[T] = {
     val cleanFun = clean(fun)
-    val keyExtractor = new KeySelector[T, K] {
+    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
+      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
     }
     javaStream.partitionCustom(partitioner, keyExtractor)
   }


[4/5] flink git commit: [FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state

Posted by mb...@apache.org.
[FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/225704bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/225704bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/225704bc

Branch: refs/heads/master
Commit: 225704bc9912536042027fca1a9880beba3bc2bb
Parents: 906bd6d
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 13 08:19:07 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/StreamFold.java     | 99 --------------------
 .../api/operators/StreamGroupedFold.java        | 72 ++++++++++++--
 .../api/operators/StreamGroupedReduce.java      | 21 +++--
 3 files changed, 77 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
deleted file mode 100644
index 81115f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class StreamFold<IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected transient OUT accumulator;
-	private byte[] serializedInitialValue;
-
-	protected TypeSerializer<OUT> outTypeSerializer;
-
-	public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
-		super(folder);
-		this.accumulator = initialValue;
-		this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		accumulator = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
-		output.collect(element.replace(accumulator));
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-
-		if (serializedInitialValue == null) {
-			throw new RuntimeException("No initial value was serialized for the fold " +
-					"operator. Probably the setOutputType method was not called.");
-		}
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
-			new DataInputStream(bais)
-		);
-
-		accumulator = outTypeSerializer.deserialize(in);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
-		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
-			new DataOutputStream(baos)
-		);
-
-		try {
-			outTypeSerializer.serialize(accumulator, out);
-		} catch (IOException ioe) {
-			throw new RuntimeException("Unable to serialize initial value of type " +
-					accumulator.getClass().getSimpleName() + " of fold operator.", ioe);
-		}
-
-		serializedInitialValue = baos.toByteArray();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f4e44c6..f8f167a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -17,50 +17,106 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
+	// Grouped values
 	private KeySelector<IN, ?> keySelector;
-	private transient Map<Object, OUT> values;
+	private transient OperatorState<HashMap<Object, OUT>> values;
+
+	// Initial value serialization
+	private byte[] serializedInitialValue;
+	private TypeSerializer<OUT> outTypeSerializer;
+	private transient OUT initialValue;
 
 	public StreamGroupedFold(
 			FoldFunction<IN, OUT> folder,
 			KeySelector<IN, ?> keySelector,
 			OUT initialValue) {
-		super(folder, initialValue);
+		super(folder);
 		this.keySelector = keySelector;
+		this.initialValue = initialValue;
 	}
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
 		super.open(configuration);
 
-		values = new HashMap<Object, OUT>();
+		if (serializedInitialValue == null) {
+			throw new RuntimeException("No initial value was serialized for the fold " +
+					"operator. Probably the setOutputType method was not called.");
+		}
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+				new DataInputStream(bais)
+		);
+		initialValue = outTypeSerializer.deserialize(in);
+
+		values = runtimeContext.getOperatorState("flink_internal_fold_values",
+				new HashMap<Object, OUT>(), false);
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Object key = keySelector.getKey(element.getValue());
-		OUT value = values.get(key);
+		OUT value = values.value().get(key);
 
 		if (value != null) {
 			OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
-			values.put(key, folded);
+			values.value().put(key, folded);
 			output.collect(element.replace(folded));
 		} else {
-			OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
-			values.put(key, first);
+			OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
+			values.value().put(key, first);
 			output.collect(element.replace(first));
 		}
 	}
 
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
+				new DataOutputStream(baos)
+		);
+
+		try {
+			outTypeSerializer.serialize(initialValue, out);
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unable to serialize initial value of type " +
+					initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
+		}
+
+		serializedInitialValue = baos.toByteArray();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 8805138..e1f9f06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -31,7 +33,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 	private static final long serialVersionUID = 1L;
 
 	private KeySelector<IN, ?> keySelector;
-	private transient Map<Object, IN> values;
+	private transient OperatorState<HashMap<Object, IN>> values;
 
 	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
 		super(reducer);
@@ -39,21 +41,24 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 	}
 
 	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		values = runtimeContext.getOperatorState("flink_internal_reduce_values",
+				new HashMap<Object, IN>(), false);
+	}
+
+	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Object key = keySelector.getKey(element.getValue());
 
-		if (values == null) {
-			values = new HashMap<>();
-		}
-
-		IN currentValue = values.get(key);
+		IN currentValue = values.value().get(key);
 		if (currentValue != null) {
 			// TODO: find a way to let operators copy elements (maybe)
 			IN reduced = userFunction.reduce(currentValue, element.getValue());
-			values.put(key, reduced);
+			values.value().put(key, reduced);
 			output.collect(element.replace(reduced));
 		} else {
-			values.put(key, element.getValue());
+			values.value().put(key, element.getValue());
 			output.collect(element.replace(element.getValue()));
 		}
 	}