You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:25 UTC
[3/5] flink git commit: [streaming] Removed unused StreamReduce
[streaming] Removed unused StreamReduce
Refactored corresponding tests, some minor cleanups.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/906bd6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/906bd6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/906bd6dc
Branch: refs/heads/master
Commit: 906bd6dcb360665af0331faddb34e7260bfe7f1a
Parents: 4938ff0
Author: mbalassi <mb...@apache.org>
Authored: Fri Sep 11 16:32:09 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/state/OperatorState.java | 10 +-
.../api/operators/StreamGroupedReduce.java | 11 +-
.../streaming/api/operators/StreamReduce.java | 53 ---
.../streaming/api/AggregationFunctionTest.java | 446 ++++++++-----------
.../flink/streaming/api/scala/DataStream.scala | 26 +-
5 files changed, 221 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 3f5e977..3036023 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
import java.io.IOException;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
/**
* Base interface for all streaming operator states. It can represent both
@@ -30,9 +32,9 @@ import org.apache.flink.api.common.functions.MapFunction;
* State can be accessed and manipulated using the {@link #value()} and
* {@link #update(T)} methods. These calls are only safe in the
* transformation call the operator represents, for instance inside
- * {@link MapFunction#map()} and can lead tp unexpected behavior in the
- * {@link #open(org.apache.flink.configuration.Configuration)} or
- * {@link #close()} methods.
+ * {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
+ * {@link AbstractRichFunction#open(Configuration)} or
+ * {@link AbstractRichFunction#close()} methods.
*
* @param <T>
* Type of the operator state
@@ -59,7 +61,7 @@ public interface OperatorState<T> {
* partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access.
*
- * @param state
+ * @param value
* The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 7533c33..8805138 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,9 +22,11 @@ import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
+ implements OneInputStreamOperator<IN, IN>{
private static final long serialVersionUID = 1L;
@@ -41,7 +43,7 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
Object key = keySelector.getKey(element.getValue());
if (values == null) {
- values = new HashMap<Object, IN>();
+ values = new HashMap<>();
}
IN currentValue = values.get(key);
@@ -56,4 +58,9 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
}
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
deleted file mode 100644
index af562fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
- implements OneInputStreamOperator<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- private transient IN currentValue;
-
- public StreamReduce(ReduceFunction<IN> reducer) {
- super(reducer);
- currentValue = null;
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
-
- if (currentValue != null) {
- currentValue = userFunction.reduce(currentValue, element.getValue());
- } else {
- currentValue = element.getValue();
- }
- output.collect(element.replace(currentValue));
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index ff04609..cdf7aae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -23,18 +23,19 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamReduce;
import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
@@ -44,32 +45,16 @@ public class AggregationFunctionTest {
@Test
public void groupSumIntegerTest() {
- List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Integer> expectedSumList0 = new ArrayList<Integer>();
- List<Integer> expectedMinList0 = new ArrayList<Integer>();
- List<Integer> expectedMaxList0 = new ArrayList<Integer>();
- List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-
- List<Integer> simpleInput = new ArrayList<Integer>();
+ // preparing expected outputs
+ List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
+ List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
+ List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
- simpleInput.add(i);
- expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
- expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
- expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
- expectedSumList0.add((i + 1) * i / 2);
- expectedMaxList0.add(i);
- expectedMinList0.add(0);
-
int groupedSum;
switch (i % 3) {
case 0:
@@ -83,99 +68,59 @@ public class AggregationFunctionTest {
break;
}
- expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
- expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3));
- expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+ expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
+ expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
+ expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
}
- TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(0, 0));
- TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
+ // some necessary boiler plate
+ TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
+ .getForObject(new Tuple2<>(0, 0));
ExecutionConfig config = new ExecutionConfig();
- ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
- new SumAggregator<Tuple2<Integer, Integer>>(1, type1, config);
- ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
- ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
- 1, type1, AggregationType.MIN, config);
- ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2,
- AggregationType.MIN, config);
- ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
- 1, type1, AggregationType.MAX, config);
- ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2,
- AggregationType.MAX, config);
- List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(sumFunction), getInputList());
-
- List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(minFunction), getInputList());
-
- List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(maxFunction), getInputList());
-
- TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(1, 1));
-
KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo),
- typeInfo, new ExecutionConfig());
+ new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+ typeInfo, config);
+
+ // aggregations tested
+ ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
+ new SumAggregator<>(1, typeInfo, config);
+ ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
+ 1, typeInfo, AggregationType.MIN, config);
+ ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
+ 1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<Tuple2<Integer, Integer>>(sumFunction, keySelector),
+ new StreamGroupedReduce<>(sumFunction, keySelector),
getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<Tuple2<Integer, Integer>>(minFunction, keySelector),
+ new StreamGroupedReduce<>(minFunction, keySelector),
getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<Tuple2<Integer, Integer>>(maxFunction, keySelector),
+ new StreamGroupedReduce<>(maxFunction, keySelector),
getInputList());
- assertEquals(expectedSumList, sumList);
- assertEquals(expectedMinList, minList);
- assertEquals(expectedMaxList, maxList);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
- assertEquals(expectedSumList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(sumFunction0), simpleInput));
- assertEquals(expectedMinList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(minFunction0), simpleInput));
- assertEquals(expectedMaxList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(maxFunction0), simpleInput));
-
}
@Test
public void pojoGroupSumIntegerTest() {
- List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
- List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
- List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
- List<Integer> expectedSumList0 = new ArrayList<Integer>();
- List<Integer> expectedMinList0 = new ArrayList<Integer>();
- List<Integer> expectedMaxList0 = new ArrayList<Integer>();
- List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
- List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
- List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
-
- List<Integer> simpleInput = new ArrayList<Integer>();
+
+ // preparing expected outputs
+ List<MyPojo> expectedGroupSumList = new ArrayList<>();
+ List<MyPojo> expectedGroupMinList = new ArrayList<>();
+ List<MyPojo> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
- simpleInput.add(i);
- expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
- expectedMinList.add(new MyPojo(i % 3, 0));
- expectedMaxList.add(new MyPojo(i % 3, i));
-
- expectedSumList0.add((i + 1) * i / 2);
- expectedMaxList0.add(i);
- expectedMinList0.add(0);
-
int groupedSum;
switch (i % 3) {
case 0:
@@ -194,222 +139,188 @@ public class AggregationFunctionTest {
expectedGroupMaxList.add(new MyPojo(i % 3, i));
}
- TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0));
- TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
- ExecutionConfig config = new ExecutionConfig();
+ // some necessary boiler plate
+ TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
- ReduceFunction<MyPojo> sumFunction = new SumAggregator<MyPojo>("f1", type1, config);
- ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
- ReduceFunction<MyPojo> minFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MIN,
- false, config);
- ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MIN,
- config);
- ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MAX,
- false, config);
- ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MAX,
- config);
-
- List<MyPojo> sumList = MockContext.createAndExecute(
- new StreamReduce<MyPojo>(sumFunction), getInputPojoList());
- List<MyPojo> minList = MockContext.createAndExecute(
- new StreamReduce<MyPojo>(minFunction), getInputPojoList());
- List<MyPojo> maxList = MockContext.createAndExecute(
- new StreamReduce<MyPojo>(maxFunction), getInputPojoList());
+ ExecutionConfig config = new ExecutionConfig();
- TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo),
+ new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
+ // aggregations tested
+ ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
+ ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
+ false, config);
+ ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
+ false, config);
+
List<MyPojo> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<MyPojo>(sumFunction, keySelector),
+ new StreamGroupedReduce<>(sumFunction, keySelector),
getInputPojoList());
List<MyPojo> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<MyPojo>(minFunction, keySelector),
+ new StreamGroupedReduce<>(minFunction, keySelector),
getInputPojoList());
List<MyPojo> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<MyPojo>(maxFunction, keySelector),
+ new StreamGroupedReduce<>(maxFunction, keySelector),
getInputPojoList());
- assertEquals(expectedSumList, sumList);
- assertEquals(expectedMinList, minList);
- assertEquals(expectedMaxList, maxList);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
- assertEquals(expectedSumList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(sumFunction0), simpleInput));
- assertEquals(expectedMinList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(minFunction0), simpleInput));
- assertEquals(expectedMaxList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(maxFunction0), simpleInput));
}
-
+
@Test
public void minMaxByTest() {
- TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(0, 0));
+ // Tuples are grouped on field 0, aggregated on field 1
+
+ // preparing expected outputs
+ List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
+ Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
+ Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
+ Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
+
+ List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
+ Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 5),
+ Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
+
+ List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
+ Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+ Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+ Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
+
+ List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
+ Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3),
+ Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6));
+
+ // some necessary boiler plate
+ TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
+ .getForObject(Tuple3.of(0,0,0));
ExecutionConfig config = new ExecutionConfig();
- ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, true, config);
- ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, false, config);
-
- ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, true, config);
- ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, false, config);
-
- List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-
- List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
- maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
-
- List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-
- List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+ KeySelector<Tuple3<Integer, Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+ typeInfo, config);
+
+ // aggregations tested
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, true, config);
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionLast =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, false, config);
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionFirst =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, true, config);
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionFirst),
- getInputList()));
+ new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+ getInputByList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionLast),
- getInputList()));
+ new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+ getInputByList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionLast),
- getInputList()));
+ new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+ getInputByList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionFirst),
- getInputList()));
+ new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+ getInputByList()));
}
@Test
public void pojoMinMaxByTest() {
+ // Pojos are grouped on field 0, aggregated on field 1
+
+ // preparing expected outputs
+ List<MyPojo3> maxByFirstExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+ new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2),
+ new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2));
+
+ List<MyPojo3> maxByLastExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+ new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 5),
+ new MyPojo3(2, 5), new MyPojo3(2, 5), new MyPojo3(2, 8));
+
+ List<MyPojo3> minByFirstExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0));
+
+ List<MyPojo3> minByLastExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+ new MyPojo3(0, 3), new MyPojo3(0, 3), new MyPojo3(0, 3),
+ new MyPojo3(0, 6), new MyPojo3(0, 6), new MyPojo3(0, 6));
+
+ // some necessary boiler plate
+ TypeInformation<MyPojo3> typeInfo = TypeExtractor.getForObject(new MyPojo3(0, 0));
+
ExecutionConfig config = new ExecutionConfig();
- TypeInformation<MyPojo> type1 = TypeExtractor
- .getForObject(new MyPojo(0, 0));
-
- ReduceFunction<MyPojo> maxByFunctionFirst =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, true, config);
- ReduceFunction<MyPojo> maxByFunctionLast =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, false, config);
-
- ReduceFunction<MyPojo> minByFunctionFirst =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, true, config);
- ReduceFunction<MyPojo> minByFunctionLast =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, false, config);
-
- List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
- maxByFirstExpected.add(new MyPojo(0, 0));
- maxByFirstExpected.add(new MyPojo(1, 1));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
-
- List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
- maxByLastExpected.add(new MyPojo(0, 0));
- maxByLastExpected.add(new MyPojo(1, 1));
- maxByLastExpected.add(new MyPojo(2, 2));
- maxByLastExpected.add(new MyPojo(2, 2));
- maxByLastExpected.add(new MyPojo(2, 2));
- maxByLastExpected.add(new MyPojo(2, 5));
- maxByLastExpected.add(new MyPojo(2, 5));
- maxByLastExpected.add(new MyPojo(2, 5));
- maxByLastExpected.add(new MyPojo(2, 8));
-
- List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
-
- List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
- minByLastExpected.add(new MyPojo(0, 0));
- minByLastExpected.add(new MyPojo(0, 0));
- minByLastExpected.add(new MyPojo(0, 0));
- minByLastExpected.add(new MyPojo(0, 3));
- minByLastExpected.add(new MyPojo(0, 3));
- minByLastExpected.add(new MyPojo(0, 3));
- minByLastExpected.add(new MyPojo(0, 6));
- minByLastExpected.add(new MyPojo(0, 6));
- minByLastExpected.add(new MyPojo(0, 6));
+
+ KeySelector<MyPojo3, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
+ typeInfo, config);
+
+ // aggregations tested
+ ReduceFunction<MyPojo3> maxByFunctionFirst =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, true, config);
+ ReduceFunction<MyPojo3> maxByFunctionLast =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, false, config);
+ ReduceFunction<MyPojo3> minByFunctionFirst =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, true, config);
+ ReduceFunction<MyPojo3> minByFunctionLast =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(maxByFunctionFirst),
- getInputPojoList()));
+ new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+ getInputByPojoList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(maxByFunctionLast),
- getInputPojoList()));
+ new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+ getInputByPojoList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(minByFunctionLast),
- getInputPojoList()));
+ new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+ getInputByPojoList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(minByFunctionFirst),
- getInputPojoList()));
+ new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+ getInputByPojoList()));
}
+ // *************************************************************************
+ // UTILS
+ // *************************************************************************
+
private List<Tuple2<Integer, Integer>> getInputList() {
- ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
+ ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<>();
for (int i = 0; i < 9; i++) {
- inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+ inputList.add(Tuple2.of(i % 3, i));
}
return inputList;
-
}
private List<MyPojo> getInputPojoList() {
- ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+ ArrayList<MyPojo> inputList = new ArrayList<>();
for (int i = 0; i < 9; i++) {
inputList.add(new MyPojo(i % 3, i));
}
return inputList;
+ }
+ private List<Tuple3<Integer, Integer, Integer>> getInputByList() {
+ ArrayList<Tuple3<Integer, Integer, Integer>> inputList = new ArrayList<>();
+ for (int i = 0; i < 9; i++) {
+ inputList.add(Tuple3.of(0, i % 3, i));
+ }
+ return inputList;
+ }
+
+ private List<MyPojo3> getInputByPojoList() {
+ ArrayList<MyPojo3> inputList = new ArrayList<>();
+ for (int i = 0; i < 9; i++) {
+ inputList.add(new MyPojo3(i % 3, i));
+ }
+ return inputList;
}
public static class MyPojo implements Serializable {
@@ -439,6 +350,39 @@ public class AggregationFunctionTest {
return false;
}
}
+ }
+
+ public static class MyPojo3 implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ public int f0;
+ public int f1;
+ public int f2;
+
+ // Field 0 is always initialized to 0
+ public MyPojo3(int f1, int f2) {
+ this.f1 = f1;
+ this.f2 = f2;
+ }
+
+ public MyPojo3() {
+ }
+
+ @Override
+ public String toString() {
+ return "POJO3(" + f0 + "," + f1 + "," + f2 + ")";
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof MyPojo3) {
+ return this.f0 == ((MyPojo3) other).f0
+ && this.f1 == ((MyPojo3) other).f1
+ && this.f2 == ((MyPojo3) other).f2;
+ } else {
+ return false;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0cf1df8..19bcb73 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,32 +18,28 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
-import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.scala.function.StatefulFunction
+import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
+import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
-import org.apache.flink.streaming.api.datastream.{KeyedStream => JavaKeyedStream}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
class DataStream[T](javaStream: JavaStream[T]) {