You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/25 10:21:00 UTC
flink git commit: [FLINK-1544] [streaming] POJO types added to
AggregationFunctionTest
Repository: flink
Updated Branches:
refs/heads/master b566e484a -> 597d8b862
[FLINK-1544] [streaming] POJO types added to AggregationFunctionTest
This closes #517
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/597d8b86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/597d8b86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/597d8b86
Branch: refs/heads/master
Commit: 597d8b86276fd5b8c501658683758816365c3edb
Parents: b566e48
Author: szape <ne...@gmail.com>
Authored: Wed Mar 11 15:36:15 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 25 10:19:55 2015 +0100
----------------------------------------------------------------------
.../streaming/api/AggregationFunctionTest.java | 293 ++++++++++++++-----
1 file changed, 217 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/597d8b86/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 115f614..bc0023a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -17,12 +17,6 @@
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -40,6 +34,13 @@ import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
public class AggregationFunctionTest {
@Test
@@ -73,15 +74,15 @@ public class AggregationFunctionTest {
int groupedSum;
switch (i % 3) {
- case 0:
- groupedSum = groupedSum0 += i;
- break;
- case 1:
- groupedSum = groupedSum1 += i;
- break;
- default:
- groupedSum = groupedSum2 += i;
- break;
+ case 0:
+ groupedSum = groupedSum0 += i;
+ break;
+ case 1:
+ groupedSum = groupedSum1 += i;
+ break;
+ default:
+ groupedSum = groupedSum2 += i;
+ break;
}
expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
@@ -118,7 +119,7 @@ public class AggregationFunctionTest {
.getForObject(new Tuple2<Integer, Integer>(1, 1));
KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, typeInfo),
+ new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo),
typeInfo, new ExecutionConfig());
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
@@ -166,73 +167,101 @@ public class AggregationFunctionTest {
// Nothing to do here
}
- ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator
- .getAggregator(0, type1, AggregationType.MAXBY, true);
- ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator
- .getAggregator(0, type1, AggregationType.MAXBY, false);
+ }
- ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator
- .getAggregator(0, type1, AggregationType.MINBY, true);
- ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator
- .getAggregator(0, type1, AggregationType.MINBY, false);
+ @Test
+ public void pojoGroupSumIntegerTest() {
+ List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
+ List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
+ List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
+ List<Integer> expectedSumList0 = new ArrayList<Integer>();
+ List<Integer> expectedMinList0 = new ArrayList<Integer>();
+ List<Integer> expectedMaxList0 = new ArrayList<Integer>();
+ List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
+ List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
+ List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
- List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
+ List<Integer> simpleInput = new ArrayList<Integer>();
- List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
- maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
+ int groupedSum0 = 0;
+ int groupedSum1 = 0;
+ int groupedSum2 = 0;
- List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
+ for (int i = 0; i < 9; i++) {
+ simpleInput.add(i);
+ expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
+ expectedMinList.add(new MyPojo(i % 3, 0));
+ expectedMaxList.add(new MyPojo(i % 3, i));
- List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+ expectedSumList0.add((i + 1) * i / 2);
+ expectedMaxList0.add(i);
+ expectedMinList0.add(0);
- assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
- getInputList()));
- assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
- getInputList()));
- assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
- getInputList()));
- assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
- getInputList()));
+ int groupedSum;
+ switch (i % 3) {
+ case 0:
+ groupedSum = groupedSum0 += i;
+ break;
+ case 1:
+ groupedSum = groupedSum1 += i;
+ break;
+ default:
+ groupedSum = groupedSum2 += i;
+ break;
+ }
+
+ expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
+ expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
+ expectedGroupMaxList.add(new MyPojo(i % 3, i));
+ }
+ TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0));
+ TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
+ ExecutionConfig config = new ExecutionConfig();
+
+ ReduceFunction<MyPojo> sumFunction = SumAggregator.getSumFunction("f1", type1, config);
+ ReduceFunction<Integer> sumFunction0 = SumAggregator.getSumFunction(0, Integer.class, type2);
+ ReduceFunction<MyPojo> minFunction = ComparableAggregator.getAggregator("f1", type1, AggregationType.MIN,
+ false, config);
+ ReduceFunction<Integer> minFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MIN);
+ ReduceFunction<MyPojo> maxFunction = ComparableAggregator.getAggregator("f1", type1, AggregationType.MAX,
+ false, config);
+ ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX);
+
+ List<MyPojo> sumList = MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(sumFunction), getInputPojoList());
+ List<MyPojo> minList = MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(minFunction), getInputPojoList());
+ List<MyPojo> maxList = MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(maxFunction), getInputPojoList());
+
+ TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
+ KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo),
+ typeInfo, config);
+
+ List<MyPojo> groupedSumList = MockContext.createAndExecute(
+ new GroupedReduceInvokable<MyPojo>(sumFunction, keySelector),
+ getInputPojoList());
+ List<MyPojo> groupedMinList = MockContext.createAndExecute(
+ new GroupedReduceInvokable<MyPojo>(minFunction, keySelector),
+ getInputPojoList());
+ List<MyPojo> groupedMaxList = MockContext.createAndExecute(
+ new GroupedReduceInvokable<MyPojo>(maxFunction, keySelector),
+ getInputPojoList());
+
+ assertEquals(expectedSumList, sumList);
+ assertEquals(expectedMinList, minList);
+ assertEquals(expectedMaxList, maxList);
+ assertEquals(expectedGroupSumList, groupedSumList);
+ assertEquals(expectedGroupMinList, groupedMinList);
+ assertEquals(expectedGroupMaxList, groupedMaxList);
+ assertEquals(expectedSumList0, MockContext.createAndExecute(
+ new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+ assertEquals(expectedMinList0, MockContext.createAndExecute(
+ new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+ assertEquals(expectedMaxList0, MockContext.createAndExecute(
+ new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
}
@Test
@@ -308,6 +337,80 @@ public class AggregationFunctionTest {
getInputList()));
}
+ @Test
+ public void pojoMinMaxByTest() {
+ ExecutionConfig config = new ExecutionConfig();
+ TypeInformation<MyPojo> type1 = TypeExtractor
+ .getForObject(new MyPojo(0, 0));
+
+ ReduceFunction<MyPojo> maxByFunctionFirst = ComparableAggregator
+ .getAggregator("f0", type1, AggregationType.MAXBY, true, config);
+ ReduceFunction<MyPojo> maxByFunctionLast = ComparableAggregator
+ .getAggregator("f0", type1, AggregationType.MAXBY, false, config);
+
+ ReduceFunction<MyPojo> minByFunctionFirst = ComparableAggregator
+ .getAggregator("f0", type1, AggregationType.MINBY, true, config);
+ ReduceFunction<MyPojo> minByFunctionLast = ComparableAggregator
+ .getAggregator("f0", type1, AggregationType.MINBY, false, config);
+
+ List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
+ maxByFirstExpected.add(new MyPojo(0, 0));
+ maxByFirstExpected.add(new MyPojo(1, 1));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+ maxByFirstExpected.add(new MyPojo(2, 2));
+
+ List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
+ maxByLastExpected.add(new MyPojo(0, 0));
+ maxByLastExpected.add(new MyPojo(1, 1));
+ maxByLastExpected.add(new MyPojo(2, 2));
+ maxByLastExpected.add(new MyPojo(2, 2));
+ maxByLastExpected.add(new MyPojo(2, 2));
+ maxByLastExpected.add(new MyPojo(2, 5));
+ maxByLastExpected.add(new MyPojo(2, 5));
+ maxByLastExpected.add(new MyPojo(2, 5));
+ maxByLastExpected.add(new MyPojo(2, 8));
+
+ List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+ minByFirstExpected.add(new MyPojo(0, 0));
+
+ List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
+ minByLastExpected.add(new MyPojo(0, 0));
+ minByLastExpected.add(new MyPojo(0, 0));
+ minByLastExpected.add(new MyPojo(0, 0));
+ minByLastExpected.add(new MyPojo(0, 3));
+ minByLastExpected.add(new MyPojo(0, 3));
+ minByLastExpected.add(new MyPojo(0, 3));
+ minByLastExpected.add(new MyPojo(0, 6));
+ minByLastExpected.add(new MyPojo(0, 6));
+ minByLastExpected.add(new MyPojo(0, 6));
+
+ assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(maxByFunctionFirst),
+ getInputPojoList()));
+ assertEquals(maxByLastExpected, MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(maxByFunctionLast),
+ getInputPojoList()));
+ assertEquals(minByLastExpected, MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(minByFunctionLast),
+ getInputPojoList()));
+ assertEquals(minByFirstExpected, MockContext.createAndExecute(
+ new StreamReduceInvokable<MyPojo>(minByFunctionFirst),
+ getInputPojoList()));
+ }
+
private List<Tuple2<Integer, Integer>> getInputList() {
ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
for (int i = 0; i < 9; i++) {
@@ -316,4 +419,42 @@ public class AggregationFunctionTest {
return inputList;
}
+
+ private List<MyPojo> getInputPojoList() {
+ ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+ for (int i = 0; i < 9; i++) {
+ inputList.add(new MyPojo(i % 3, i));
+ }
+ return inputList;
+
+ }
+
+ public static class MyPojo implements Serializable {
+
+ public int f0;
+ public int f1;
+
+ public MyPojo(int f0, int f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ public MyPojo() {
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof MyPojo) {
+ return this.f0 == ((MyPojo) other).f0 && this.f1 == ((MyPojo) other).f1;
+ } else {
+ return false;
+ }
+ }
+
+ }
}