You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:25 UTC

[3/5] flink git commit: [streaming] Removed unused StreamReduce

[streaming] Removed unused StreamReduce

Refactored corresponding tests, some minor cleanups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/906bd6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/906bd6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/906bd6dc

Branch: refs/heads/master
Commit: 906bd6dcb360665af0331faddb34e7260bfe7f1a
Parents: 4938ff0
Author: mbalassi <mb...@apache.org>
Authored: Fri Sep 11 16:32:09 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/OperatorState.java   |  10 +-
 .../api/operators/StreamGroupedReduce.java      |  11 +-
 .../streaming/api/operators/StreamReduce.java   |  53 ---
 .../streaming/api/AggregationFunctionTest.java  | 446 ++++++++-----------
 .../flink/streaming/api/scala/DataStream.scala  |  26 +-
 5 files changed, 221 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 3f5e977..3036023 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
 import java.io.IOException;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 
 /**
  * Base interface for all streaming operator states. It can represent both
@@ -30,9 +32,9 @@ import org.apache.flink.api.common.functions.MapFunction;
  * State can be accessed and manipulated using the {@link #value()} and
  * {@link #update(T)} methods. These calls are only safe in the
  * transformation call the operator represents, for instance inside
- * {@link MapFunction#map()} and can lead tp unexpected behavior in the
- * {@link #open(org.apache.flink.configuration.Configuration)} or
- * {@link #close()} methods.
+ * {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
+ * {@link AbstractRichFunction#open(Configuration)} or
+ * {@link AbstractRichFunction#close()} methods.
  * 
  * @param <T>
  *            Type of the operator state
@@ -59,7 +61,7 @@ public interface OperatorState<T> {
 	 * partitioned state is updated with null, the state for the current key 
 	 * will be removed and the default value is returned on the next access.
 	 * 
-	 * @param state
+	 * @param value
 	 *            The new value for the state.
 	 *            
 	 * @throws IOException Thrown if the system cannot access the state.

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 7533c33..8805138 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,9 +22,11 @@ import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
+		implements OneInputStreamOperator<IN, IN>{
 
 	private static final long serialVersionUID = 1L;
 
@@ -41,7 +43,7 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 		Object key = keySelector.getKey(element.getValue());
 
 		if (values == null) {
-			values = new HashMap<Object, IN>();
+			values = new HashMap<>();
 		}
 
 		IN currentValue = values.get(key);
@@ -56,4 +58,9 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 		}
 	}
 
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
deleted file mode 100644
index af562fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
-		implements OneInputStreamOperator<IN, IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient IN currentValue;
-
-	public StreamReduce(ReduceFunction<IN> reducer) {
-		super(reducer);
-		currentValue = null;
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-
-		if (currentValue != null) {
-			currentValue = userFunction.reduce(currentValue, element.getValue());
-		} else {
-			currentValue = element.getValue();
-		}
-		output.collect(element.replace(currentValue));
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index ff04609..cdf7aae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -23,18 +23,19 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamReduce;
 import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
@@ -44,32 +45,16 @@ public class AggregationFunctionTest {
 	@Test
 	public void groupSumIntegerTest() {
 
-		List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Integer> expectedSumList0 = new ArrayList<Integer>();
-		List<Integer> expectedMinList0 = new ArrayList<Integer>();
-		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
-		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-
-		List<Integer> simpleInput = new ArrayList<Integer>();
+		// preparing expected outputs
+		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
+		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
+		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
 
 		int groupedSum0 = 0;
 		int groupedSum1 = 0;
 		int groupedSum2 = 0;
 
 		for (int i = 0; i < 9; i++) {
-			simpleInput.add(i);
-			expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
-			expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
-			expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
-			expectedSumList0.add((i + 1) * i / 2);
-			expectedMaxList0.add(i);
-			expectedMinList0.add(0);
-
 			int groupedSum;
 			switch (i % 3) {
 				case 0:
@@ -83,99 +68,59 @@ public class AggregationFunctionTest {
 					break;
 			}
 
-			expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
-			expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3));
-			expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+			expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
+			expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
+			expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
 		}
 
-		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(0, 0));
-		TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
+		// some necessary boiler plate
+		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
+				.getForObject(new Tuple2<>(0, 0));
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
-				new SumAggregator<Tuple2<Integer, Integer>>(1, type1, config);
-		ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
-		ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
-				1, type1, AggregationType.MIN, config);
-		ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2,
-				AggregationType.MIN, config);
-		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
-				1, type1, AggregationType.MAX, config);
-		ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2,
-				AggregationType.MAX, config);
-		List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(sumFunction), getInputList());
-
-		List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(minFunction), getInputList());
-
-		List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(maxFunction), getInputList());
-
-		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(1, 1));
-
 		KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo),
-				typeInfo, new ExecutionConfig());
+				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+				typeInfo, config);
+
+		// aggregations tested
+		ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
+				new SumAggregator<>(1, typeInfo, config);
+		ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
+				1, typeInfo, AggregationType.MIN, config);
+		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
+				1, typeInfo, AggregationType.MAX, config);
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<Tuple2<Integer, Integer>>(sumFunction, keySelector),
+				new StreamGroupedReduce<>(sumFunction, keySelector),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<Tuple2<Integer, Integer>>(minFunction, keySelector),
+				new StreamGroupedReduce<>(minFunction, keySelector),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<Tuple2<Integer, Integer>>(maxFunction, keySelector),
+				new StreamGroupedReduce<>(maxFunction, keySelector),
 				getInputList());
 
-		assertEquals(expectedSumList, sumList);
-		assertEquals(expectedMinList, minList);
-		assertEquals(expectedMaxList, maxList);
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(sumFunction0), simpleInput));
-		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(minFunction0), simpleInput));
-		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(maxFunction0), simpleInput));
-		
 	}
 
 	@Test
 	public void pojoGroupSumIntegerTest() {
-		List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
-		List<Integer> expectedSumList0 = new ArrayList<Integer>();
-		List<Integer> expectedMinList0 = new ArrayList<Integer>();
-		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
-		List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
-		List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
-
-		List<Integer> simpleInput = new ArrayList<Integer>();
+
+		// preparing expected outputs
+		List<MyPojo> expectedGroupSumList = new ArrayList<>();
+		List<MyPojo> expectedGroupMinList = new ArrayList<>();
+		List<MyPojo> expectedGroupMaxList = new ArrayList<>();
 
 		int groupedSum0 = 0;
 		int groupedSum1 = 0;
 		int groupedSum2 = 0;
 
 		for (int i = 0; i < 9; i++) {
-			simpleInput.add(i);
-			expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
-			expectedMinList.add(new MyPojo(i % 3, 0));
-			expectedMaxList.add(new MyPojo(i % 3, i));
-
-			expectedSumList0.add((i + 1) * i / 2);
-			expectedMaxList0.add(i);
-			expectedMinList0.add(0);
-
 			int groupedSum;
 			switch (i % 3) {
 				case 0:
@@ -194,222 +139,188 @@ public class AggregationFunctionTest {
 			expectedGroupMaxList.add(new MyPojo(i % 3, i));
 		}
 
-		TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0));
-		TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
-		ExecutionConfig config = new ExecutionConfig();
+		// some necessary boiler plate
+		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
 
-		ReduceFunction<MyPojo> sumFunction = new SumAggregator<MyPojo>("f1", type1, config);
-		ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
-		ReduceFunction<MyPojo> minFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MIN,
-				false, config);
-		ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MIN,
-				config);
-		ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MAX,
-				false, config);
-		ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MAX,
-				config);
-
-		List<MyPojo> sumList = MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(sumFunction), getInputPojoList());
-		List<MyPojo> minList = MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(minFunction), getInputPojoList());
-		List<MyPojo> maxList = MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(maxFunction), getInputPojoList());
+		ExecutionConfig config = new ExecutionConfig();
 
-		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
 		KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo),
+				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
 				typeInfo, config);
 
+		// aggregations tested
+		ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
+		ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
+				false, config);
+		ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
+				false, config);
+
 		List<MyPojo> groupedSumList = MockContext.createAndExecute(
-				new StreamGroupedReduce<MyPojo>(sumFunction, keySelector),
+				new StreamGroupedReduce<>(sumFunction, keySelector),
 				getInputPojoList());
 		List<MyPojo> groupedMinList = MockContext.createAndExecute(
-				new StreamGroupedReduce<MyPojo>(minFunction, keySelector),
+				new StreamGroupedReduce<>(minFunction, keySelector),
 				getInputPojoList());
 		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
-				new StreamGroupedReduce<MyPojo>(maxFunction, keySelector),
+				new StreamGroupedReduce<>(maxFunction, keySelector),
 				getInputPojoList());
 
-		assertEquals(expectedSumList, sumList);
-		assertEquals(expectedMinList, minList);
-		assertEquals(expectedMaxList, maxList);
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(sumFunction0), simpleInput));
-		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(minFunction0), simpleInput));
-		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduce<Integer>(maxFunction0), simpleInput));
 	}
-
+	
 	@Test
 	public void minMaxByTest() {
-		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(0, 0));
+		// Tuples are grouped on field 0, aggregated on field 1
+		
+		// preparing expected outputs
+		List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
+				Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
+				Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
+				Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
+
+		List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
+				Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
+				Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 5),
+				Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
+
+		List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
+				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
+
+		List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
+				Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
+				Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3),
+				Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6));
+
+		// some necessary boiler plate
+		TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
+				.getForObject(Tuple3.of(0,0,0));
 
 		ExecutionConfig config = new ExecutionConfig();
 
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, true, config);
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, false, config);
-
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, true, config);
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast =
-				new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, false, config);
-
-		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-
-		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
-
-		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-
-		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+		KeySelector<Tuple3<Integer, Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+				typeInfo, config);
+		
+		// aggregations tested
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, true, config);
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionLast =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, false, config);
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionFirst =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, true, config);
+		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
+				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionFirst),
-				getInputList()));
+				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+				getInputByList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionLast),
-				getInputList()));
+				new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+				getInputByList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionLast),
-				getInputList()));
+				new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+				getInputByList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionFirst),
-				getInputList()));
+				new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+				getInputByList()));
 	}
 
 	@Test
 	public void pojoMinMaxByTest() {
+		// Pojos are grouped on field 0, aggregated on field 1
+
+		// preparing expected outputs
+		List<MyPojo3> maxByFirstExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2),
+				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2));
+
+		List<MyPojo3> maxByLastExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 5),
+				new MyPojo3(2, 5), new MyPojo3(2, 5), new MyPojo3(2, 8));
+
+		List<MyPojo3> minByFirstExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0));
+
+		List<MyPojo3> minByLastExpected = ImmutableList.of(
+				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+				new MyPojo3(0, 3), new MyPojo3(0, 3), new MyPojo3(0, 3),
+				new MyPojo3(0, 6), new MyPojo3(0, 6), new MyPojo3(0, 6));
+
+		// some necessary boiler plate
+		TypeInformation<MyPojo3> typeInfo = TypeExtractor.getForObject(new MyPojo3(0, 0));
+
 		ExecutionConfig config = new ExecutionConfig();
-		TypeInformation<MyPojo> type1 = TypeExtractor
-				.getForObject(new MyPojo(0, 0));
-
-		ReduceFunction<MyPojo> maxByFunctionFirst =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, true, config);
-		ReduceFunction<MyPojo> maxByFunctionLast =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, false, config);
-
-		ReduceFunction<MyPojo> minByFunctionFirst =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, true, config);
-		ReduceFunction<MyPojo> minByFunctionLast =
-				new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, false, config);
-
-		List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
-		maxByFirstExpected.add(new MyPojo(0, 0));
-		maxByFirstExpected.add(new MyPojo(1, 1));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-		maxByFirstExpected.add(new MyPojo(2, 2));
-
-		List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
-		maxByLastExpected.add(new MyPojo(0, 0));
-		maxByLastExpected.add(new MyPojo(1, 1));
-		maxByLastExpected.add(new MyPojo(2, 2));
-		maxByLastExpected.add(new MyPojo(2, 2));
-		maxByLastExpected.add(new MyPojo(2, 2));
-		maxByLastExpected.add(new MyPojo(2, 5));
-		maxByLastExpected.add(new MyPojo(2, 5));
-		maxByLastExpected.add(new MyPojo(2, 5));
-		maxByLastExpected.add(new MyPojo(2, 8));
-
-		List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-		minByFirstExpected.add(new MyPojo(0, 0));
-
-		List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
-		minByLastExpected.add(new MyPojo(0, 0));
-		minByLastExpected.add(new MyPojo(0, 0));
-		minByLastExpected.add(new MyPojo(0, 0));
-		minByLastExpected.add(new MyPojo(0, 3));
-		minByLastExpected.add(new MyPojo(0, 3));
-		minByLastExpected.add(new MyPojo(0, 3));
-		minByLastExpected.add(new MyPojo(0, 6));
-		minByLastExpected.add(new MyPojo(0, 6));
-		minByLastExpected.add(new MyPojo(0, 6));
+
+		KeySelector<MyPojo3, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
+				typeInfo, config);
+
+		// aggregations tested
+		ReduceFunction<MyPojo3> maxByFunctionFirst =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, true, config);
+		ReduceFunction<MyPojo3> maxByFunctionLast =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, false, config);
+		ReduceFunction<MyPojo3> minByFunctionFirst =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, true, config);
+		ReduceFunction<MyPojo3> minByFunctionLast =
+				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(maxByFunctionFirst),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+				getInputByPojoList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(maxByFunctionLast),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+				getInputByPojoList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(minByFunctionLast),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+				getInputByPojoList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduce<MyPojo>(minByFunctionFirst),
-				getInputPojoList()));
+				new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+				getInputByPojoList()));
 	}
 
+	// *************************************************************************
+	//     UTILS
+	// *************************************************************************
+
 	private List<Tuple2<Integer, Integer>> getInputList() {
-		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
+		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<>();
 		for (int i = 0; i < 9; i++) {
-			inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+			inputList.add(Tuple2.of(i % 3, i));
 		}
 		return inputList;
-
 	}
 
 	private List<MyPojo> getInputPojoList() {
-		ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+		ArrayList<MyPojo> inputList = new ArrayList<>();
 		for (int i = 0; i < 9; i++) {
 			inputList.add(new MyPojo(i % 3, i));
 		}
 		return inputList;
+	}
 
+	private List<Tuple3<Integer, Integer, Integer>> getInputByList() {
+		ArrayList<Tuple3<Integer, Integer, Integer>> inputList = new ArrayList<>();
+		for (int i = 0; i < 9; i++) {
+			inputList.add(Tuple3.of(0, i % 3, i));
+		}
+		return inputList;
+	}
+
+	private List<MyPojo3> getInputByPojoList() {
+		ArrayList<MyPojo3> inputList = new ArrayList<>();
+		for (int i = 0; i < 9; i++) {
+			inputList.add(new MyPojo3(i % 3, i));
+		}
+		return inputList;
 	}
 
 	public static class MyPojo implements Serializable {
@@ -439,6 +350,39 @@ public class AggregationFunctionTest {
 				return false;
 			}
 		}
+	}
+
+	public static class MyPojo3 implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+		public int f0;
+		public int f1;
+		public int f2;
+
+		// Field 0 is always initialized to 0
+		public MyPojo3(int f1, int f2) {
+			this.f1 = f1;
+			this.f2 = f2;
+		}
+
+		public MyPojo3() {
+		}
+
+		@Override
+		public String toString() {
+			return "POJO3(" + f0 + "," + f1 + "," + f2 + ")";
+		}
+
+		@Override
+		public boolean equals(Object other) {
+			if (other instanceof MyPojo3) {
+				return this.f0 == ((MyPojo3) other).f0
+						&& this.f1 == ((MyPojo3) other).f1
+						&& this.f2 == ((MyPojo3) other).f2;
+			} else {
+				return false;
+			}
+		}
 
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0cf1df8..19bcb73 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,32 +18,28 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
-import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.scala.function.StatefulFunction
+import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
+import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
-import org.apache.flink.streaming.api.datastream.{KeyedStream => JavaKeyedStream}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 class DataStream[T](javaStream: JavaStream[T]) {