You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/25 10:21:00 UTC

flink git commit: [FLINK-1544] [streaming] POJO types added to AggregationFunctionTest

Repository: flink
Updated Branches:
  refs/heads/master b566e484a -> 597d8b862


[FLINK-1544] [streaming] POJO types added to AggregationFunctionTest

This closes #517


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/597d8b86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/597d8b86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/597d8b86

Branch: refs/heads/master
Commit: 597d8b86276fd5b8c501658683758816365c3edb
Parents: b566e48
Author: szape <ne...@gmail.com>
Authored: Wed Mar 11 15:36:15 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 25 10:19:55 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/AggregationFunctionTest.java  | 293 ++++++++++++++-----
 1 file changed, 217 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/597d8b86/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 115f614..bc0023a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -40,6 +34,13 @@ import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class AggregationFunctionTest {
 
 	@Test
@@ -73,15 +74,15 @@ public class AggregationFunctionTest {
 
 			int groupedSum;
 			switch (i % 3) {
-			case 0:
-				groupedSum = groupedSum0 += i;
-				break;
-			case 1:
-				groupedSum = groupedSum1 += i;
-				break;
-			default:
-				groupedSum = groupedSum2 += i;
-				break;
+				case 0:
+					groupedSum = groupedSum0 += i;
+					break;
+				case 1:
+					groupedSum = groupedSum1 += i;
+					break;
+				default:
+					groupedSum = groupedSum2 += i;
+					break;
 			}
 
 			expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
@@ -118,7 +119,7 @@ public class AggregationFunctionTest {
 				.getForObject(new Tuple2<Integer, Integer>(1, 1));
 
 		KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, typeInfo),
+				new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo),
 				typeInfo, new ExecutionConfig());
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
@@ -166,73 +167,101 @@ public class AggregationFunctionTest {
 			// Nothing to do here
 		}
 
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MAXBY, true);
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MAXBY, false);
+	}
 
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MINBY, true);
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MINBY, false);
+	@Test
+	public void pojoGroupSumIntegerTest() {
+		List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
+		List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
+		List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
+		List<Integer> expectedSumList0 = new ArrayList<Integer>();
+		List<Integer> expectedMinList0 = new ArrayList<Integer>();
+		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
+		List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
+		List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
+		List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
 
-		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+		List<Integer> simpleInput = new ArrayList<Integer>();
 
-		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+		int groupedSum0 = 0;
+		int groupedSum1 = 0;
+		int groupedSum2 = 0;
 
-		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+		for (int i = 0; i < 9; i++) {
+			simpleInput.add(i);
+			expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
+			expectedMinList.add(new MyPojo(i % 3, 0));
+			expectedMaxList.add(new MyPojo(i % 3, i));
 
-		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+			expectedSumList0.add((i + 1) * i / 2);
+			expectedMaxList0.add(i);
+			expectedMinList0.add(0);
 
-		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
-				getInputList()));
-		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
-				getInputList()));
-		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
-				getInputList()));
-		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
-				getInputList()));
+			int groupedSum;
+			switch (i % 3) {
+				case 0:
+					groupedSum = groupedSum0 += i;
+					break;
+				case 1:
+					groupedSum = groupedSum1 += i;
+					break;
+				default:
+					groupedSum = groupedSum2 += i;
+					break;
+			}
+
+			expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
+			expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
+			expectedGroupMaxList.add(new MyPojo(i % 3, i));
+		}
 
+		TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0));
+		TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
+		ExecutionConfig config = new ExecutionConfig();
+
+		ReduceFunction<MyPojo> sumFunction = SumAggregator.getSumFunction("f1", type1, config);
+		ReduceFunction<Integer> sumFunction0 = SumAggregator.getSumFunction(0, Integer.class, type2);
+		ReduceFunction<MyPojo> minFunction = ComparableAggregator.getAggregator("f1", type1, AggregationType.MIN,
+				false, config);
+		ReduceFunction<Integer> minFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MIN);
+		ReduceFunction<MyPojo> maxFunction = ComparableAggregator.getAggregator("f1", type1, AggregationType.MAX,
+				false, config);
+		ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX);
+
+		List<MyPojo> sumList = MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(sumFunction), getInputPojoList());
+		List<MyPojo> minList = MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(minFunction), getInputPojoList());
+		List<MyPojo> maxList = MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(maxFunction), getInputPojoList());
+
+		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
+		KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+				new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo),
+				typeInfo, config);
+
+		List<MyPojo> groupedSumList = MockContext.createAndExecute(
+				new GroupedReduceInvokable<MyPojo>(sumFunction, keySelector),
+				getInputPojoList());
+		List<MyPojo> groupedMinList = MockContext.createAndExecute(
+				new GroupedReduceInvokable<MyPojo>(minFunction, keySelector),
+				getInputPojoList());
+		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
+				new GroupedReduceInvokable<MyPojo>(maxFunction, keySelector),
+				getInputPojoList());
+
+		assertEquals(expectedSumList, sumList);
+		assertEquals(expectedMinList, minList);
+		assertEquals(expectedMaxList, maxList);
+		assertEquals(expectedGroupSumList, groupedSumList);
+		assertEquals(expectedGroupMinList, groupedMinList);
+		assertEquals(expectedGroupMaxList, groupedMaxList);
+		assertEquals(expectedSumList0, MockContext.createAndExecute(
+				new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+		assertEquals(expectedMinList0, MockContext.createAndExecute(
+				new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+		assertEquals(expectedMaxList0, MockContext.createAndExecute(
+				new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
 	}
 
 	@Test
@@ -308,6 +337,80 @@ public class AggregationFunctionTest {
 				getInputList()));
 	}
 
+	@Test
+	public void pojoMinMaxByTest() {
+		ExecutionConfig config = new ExecutionConfig();
+		TypeInformation<MyPojo> type1 = TypeExtractor
+				.getForObject(new MyPojo(0, 0));
+
+		ReduceFunction<MyPojo> maxByFunctionFirst = ComparableAggregator
+				.getAggregator("f0", type1, AggregationType.MAXBY, true, config);
+		ReduceFunction<MyPojo> maxByFunctionLast = ComparableAggregator
+				.getAggregator("f0", type1, AggregationType.MAXBY, false, config);
+
+		ReduceFunction<MyPojo> minByFunctionFirst = ComparableAggregator
+				.getAggregator("f0", type1, AggregationType.MINBY, true, config);
+		ReduceFunction<MyPojo> minByFunctionLast = ComparableAggregator
+				.getAggregator("f0", type1, AggregationType.MINBY, false, config);
+
+		List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
+		maxByFirstExpected.add(new MyPojo(0, 0));
+		maxByFirstExpected.add(new MyPojo(1, 1));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+		maxByFirstExpected.add(new MyPojo(2, 2));
+
+		List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
+		maxByLastExpected.add(new MyPojo(0, 0));
+		maxByLastExpected.add(new MyPojo(1, 1));
+		maxByLastExpected.add(new MyPojo(2, 2));
+		maxByLastExpected.add(new MyPojo(2, 2));
+		maxByLastExpected.add(new MyPojo(2, 2));
+		maxByLastExpected.add(new MyPojo(2, 5));
+		maxByLastExpected.add(new MyPojo(2, 5));
+		maxByLastExpected.add(new MyPojo(2, 5));
+		maxByLastExpected.add(new MyPojo(2, 8));
+
+		List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+		minByFirstExpected.add(new MyPojo(0, 0));
+
+		List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
+		minByLastExpected.add(new MyPojo(0, 0));
+		minByLastExpected.add(new MyPojo(0, 0));
+		minByLastExpected.add(new MyPojo(0, 0));
+		minByLastExpected.add(new MyPojo(0, 3));
+		minByLastExpected.add(new MyPojo(0, 3));
+		minByLastExpected.add(new MyPojo(0, 3));
+		minByLastExpected.add(new MyPojo(0, 6));
+		minByLastExpected.add(new MyPojo(0, 6));
+		minByLastExpected.add(new MyPojo(0, 6));
+
+		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(maxByFunctionFirst),
+				getInputPojoList()));
+		assertEquals(maxByLastExpected, MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(maxByFunctionLast),
+				getInputPojoList()));
+		assertEquals(minByLastExpected, MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(minByFunctionLast),
+				getInputPojoList()));
+		assertEquals(minByFirstExpected, MockContext.createAndExecute(
+				new StreamReduceInvokable<MyPojo>(minByFunctionFirst),
+				getInputPojoList()));
+	}
+
 	private List<Tuple2<Integer, Integer>> getInputList() {
 		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
 		for (int i = 0; i < 9; i++) {
@@ -316,4 +419,42 @@ public class AggregationFunctionTest {
 		return inputList;
 
 	}
+
+	private List<MyPojo> getInputPojoList() {
+		ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+		for (int i = 0; i < 9; i++) {
+			inputList.add(new MyPojo(i % 3, i));
+		}
+		return inputList;
+
+	}
+
+	public static class MyPojo implements Serializable {
+
+		public int f0;
+		public int f1;
+
+		public MyPojo(int f0, int f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		public MyPojo() {
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+
+		@Override
+		public boolean equals(Object other) {
+			if (other instanceof MyPojo) {
+				return this.f0 == ((MyPojo) other).f0 && this.f1 == ((MyPojo) other).f1;
+			} else {
+				return false;
+			}
+		}
+
+	}
 }