You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:23 UTC
[1/5] flink git commit: [FLINK-2283] [streaming] Proper serialization
of state in StreamGroupedFold and Reduce
Repository: flink
Updated Branches:
refs/heads/master 4938ff09f -> e494c2795
[FLINK-2283] [streaming] Proper serialization of state in StreamGroupedFold and Reduce
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c414ea98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c414ea98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c414ea98
Branch: refs/heads/master
Commit: c414ea98ec6896fd9c011d421cb3e5c9587c8446
Parents: a76d963
Author: mbalassi <mb...@apache.org>
Authored: Mon Oct 5 22:31:29 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200
----------------------------------------------------------------------
.../api/common/state/StateCheckpointer.java | 4 +-
.../streaming/api/datastream/KeyedStream.java | 7 +-
.../api/operators/AbstractStreamOperator.java | 1 -
.../operators/AbstractUdfStreamOperator.java | 3 +-
.../api/operators/StreamGroupedFold.java | 21 +++--
.../api/operators/StreamGroupedReduce.java | 26 ++++--
.../streaming/api/state/KVMapCheckpointer.java | 82 +++++++++++++++++++
.../streaming/api/AggregationFunctionTest.java | 28 +++----
.../api/operators/StreamGroupedFoldTest.java | 42 +++++-----
.../api/operators/StreamGroupedReduceTest.java | 86 ++++++++++----------
.../flink/streaming/api/scala/KeyedStream.scala | 2 +-
.../UdfStreamOperatorCheckpointingITCase.java | 20 ++---
12 files changed, 215 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
index 488e308..f373846 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
@@ -58,7 +58,7 @@ public interface StateCheckpointer<S, C extends Serializable> {
*
* @return A snapshot of the operator state.
*/
- public C snapshotState(S state, long checkpointId, long checkpointTimestamp);
+ C snapshotState(S state, long checkpointId, long checkpointTimestamp);
/**
* Restores the operator states from a given snapshot. The restores state
@@ -69,5 +69,5 @@ public interface StateCheckpointer<S, C extends Serializable> {
* The state snapshot that needs to be restored.
* @return The state corresponding to the snapshot.
*/
- public S restoreState(C stateSnapshot);
+ S restoreState(C stateSnapshot);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index f7c5b53..edb7981 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -186,7 +186,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
- return transform("Keyed Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector));
+ return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
+ clean(reducer), keySelector, getType()));
}
/**
@@ -208,7 +209,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
Utils.getCallLocationName(), true);
return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder),
- keySelector, initialValue));
+ keySelector, initialValue, getType()));
}
/**
@@ -443,7 +444,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
- StreamGroupedReduce<T> operator = new StreamGroupedReduce<>(clean(aggregate), keySelector);
+ StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector, getType());
return transform("Keyed Aggregation", getType(), operator);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 77bd130..87041eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -35,7 +35,6 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
private static final long serialVersionUID = 1L;
-
protected transient StreamingRuntimeContext runtimeContext;
protected transient ExecutionConfig executionConfig;
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index c0d71e9..dc9a152 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -56,8 +56,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
-
-
+
/** the user function */
protected final F userFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index ad40b85..732630a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -30,13 +30,15 @@ import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.state.KVMapCheckpointer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+
implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
@@ -50,13 +52,18 @@ public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, F
private TypeSerializer<OUT> outTypeSerializer;
private transient OUT initialValue;
- public StreamGroupedFold(
- FoldFunction<IN, OUT> folder,
- KeySelector<IN, ?> keySelector,
- OUT initialValue) {
+ // Store the typeinfo, create serializer during runtime
+ private TypeInformation<Object> keyTypeInformation;
+
+ @SuppressWarnings("unchecked")
+ public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
+ OUT initialValue, TypeInformation<IN> inTypeInformation) {
super(folder);
this.keySelector = keySelector;
this.initialValue = initialValue;
+ keyTypeInformation = (TypeInformation<Object>) TypeExtractor
+ .getKeySelectorTypes(keySelector, inTypeInformation);
+
}
@Override
@@ -75,7 +82,9 @@ public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, F
initialValue = outTypeSerializer.deserialize(in);
values = runtimeContext.getOperatorState("flink_internal_fold_values",
- new HashMap<Object, OUT>(), false);
+ new HashMap<Object, OUT>(), false,
+ new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
+ outTypeSerializer));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 68ebd8f..579814d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -17,33 +17,48 @@
package org.apache.flink.streaming.api.operators;
-import java.util.HashMap;
-
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.state.KVMapCheckpointer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import java.util.HashMap;
+
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
- implements OneInputStreamOperator<IN, IN>{
+ implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector;
private transient OperatorState<HashMap<Object, IN>> values;
- public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
+ // Store the typeinfo, create serializer during runtime
+ private TypeInformation<Object> keyTypeInformation;
+ private TypeInformation<IN> valueTypeInformation;
+
+ @SuppressWarnings("unchecked")
+ public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector,
+ TypeInformation<IN> typeInformation) {
super(reducer);
this.keySelector = keySelector;
+ valueTypeInformation = typeInformation;
+ keyTypeInformation = (TypeInformation<Object>) TypeExtractor
+ .getKeySelectorTypes(keySelector, typeInformation);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
+
values = runtimeContext.getOperatorState("flink_internal_reduce_values",
- new HashMap<Object, IN>(), false);
+ new HashMap<Object, IN>(), false,
+ new KVMapCheckpointer<>(keyTypeInformation.createSerializer(executionConfig),
+ valueTypeInformation.createSerializer(executionConfig)));
}
@Override
@@ -67,4 +82,5 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
output.emitWatermark(mark);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
new file mode 100644
index 0000000..17cb6a0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/KVMapCheckpointer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of the {@link StateCheckpointer} interface for a map storing
+ * types compatible with Flink's serialization system.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class KVMapCheckpointer<K, V> implements StateCheckpointer<HashMap<K, V>, byte[]> {
+
+ private TypeSerializer<K> keySerializer;
+ private TypeSerializer<V> valueSerializer;
+
+ public KVMapCheckpointer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ }
+
+ @Override
+ public byte[] snapshotState(HashMap<K, V> stateMap, long checkpointId, long checkpointTimestamp) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(stateMap.size() * 16);
+ DataOutputView out = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
+ try {
+ out.writeInt(stateMap.size());
+ for (Map.Entry<K, V> kv : stateMap.entrySet()) {
+ keySerializer.serialize(kv.getKey(), out);
+ valueSerializer.serialize(kv.getValue(), out);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write snapshot", e);
+ }
+ return bos.toByteArray();
+ }
+
+ @Override
+ public HashMap<K, V> restoreState(byte[] stateSnapshot) {
+ ByteArrayInputView in = new ByteArrayInputView(stateSnapshot);
+
+ HashMap<K, V> returnMap = new HashMap<>();
+ try {
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ returnMap.put(keySerializer.deserialize(in), valueSerializer.deserialize(in));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read snapshot", e);
+ }
+
+ return returnMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index cdf7aae..e002780 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -92,15 +92,15 @@ public class AggregationFunctionTest {
1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(sumFunction, keySelector),
+ new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(minFunction, keySelector),
+ new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxFunction, keySelector),
+ new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
getInputList());
assertEquals(expectedGroupSumList, groupedSumList);
@@ -156,13 +156,13 @@ public class AggregationFunctionTest {
false, config);
List<MyPojo> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(sumFunction, keySelector),
+ new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
getInputPojoList());
List<MyPojo> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(minFunction, keySelector),
+ new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
getInputPojoList());
List<MyPojo> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxFunction, keySelector),
+ new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
getInputPojoList());
assertEquals(expectedGroupSumList, groupedSumList);
@@ -216,16 +216,16 @@ public class AggregationFunctionTest {
new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+ new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
getInputByList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+ new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
getInputByList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+ new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
getInputByList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+ new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
getInputByList()));
}
@@ -274,16 +274,16 @@ public class AggregationFunctionTest {
new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+ new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
getInputByPojoList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+ new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
getInputByPojoList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+ new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
getInputByPojoList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+ new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
getInputByPojoList()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 82dddfe..bc5d614 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -56,12 +56,14 @@ public class StreamGroupedFoldTest {
}
+ private TypeInformation<Integer> inType = TypeExtractor.getForClass(Integer.class);
+ private TypeInformation<String> outType = TypeExtractor.getForClass(String.class);
+
@Test
@SuppressWarnings("unchecked")
public void testGroupedFold() throws Exception {
- TypeInformation<String> outType = TypeExtractor.getForObject("A string");
- StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(
+ StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(
new MyFolder(), new KeySelector<Integer, String>() {
private static final long serialVersionUID = 1L;
@@ -70,55 +72,55 @@ public class StreamGroupedFoldTest {
public String getKey(Integer value) throws Exception {
return value.toString();
}
- }, "100");
+ }, "100", inType);
operator.setOutputType(outType, new ExecutionConfig());
- OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2));
+ testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+ testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4));
- testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5));
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
+ testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
- expectedOutput.add(new StreamRecord<String>("1001", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("10011", initialTime + 2));
+ expectedOutput.add(new StreamRecord<>("1001", initialTime + 1));
+ expectedOutput.add(new StreamRecord<>("10011", initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("1002", initialTime + 3));
- expectedOutput.add(new StreamRecord<String>("10022", initialTime + 4));
- expectedOutput.add(new StreamRecord<String>("1003", initialTime + 5));
+ expectedOutput.add(new StreamRecord<>("1002", initialTime + 3));
+ expectedOutput.add(new StreamRecord<>("10022", initialTime + 4));
+ expectedOutput.add(new StreamRecord<>("1003", initialTime + 5));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
@Test
public void testOpenClose() throws Exception {
- StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
+ StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
- }, "init");
+ }, "init", inType);
operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
- OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
long initialTime = 0L;
testHarness.open();
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime));
+ testHarness.processElement(new StreamRecord<>(1, initialTime));
+ testHarness.processElement(new StreamRecord<>(2, initialTime));
testHarness.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index a2cd1fd..85d9bc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -22,7 +22,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -43,71 +45,47 @@ import org.junit.Test;
public class StreamGroupedReduceTest {
- private static class MyReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
@Test
@SuppressWarnings("unchecked")
public void testGroupedReduce() throws Exception {
- StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new MyReducer(), new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- });
+ StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), new IntegerKeySelector(), typeInfo);
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2));
+ testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+ testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4));
- testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5));
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
+ testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
- expectedOutput.add(new StreamRecord<Integer>(1, initialTime + 1));
- expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
+ expectedOutput.add(new StreamRecord<>(1, initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(2, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 3));
- expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
- expectedOutput.add(new StreamRecord<Integer>(3, initialTime + 5));
+ expectedOutput.add(new StreamRecord<>(2, initialTime + 3));
+ expectedOutput.add(new StreamRecord<>(4, initialTime + 4));
+ expectedOutput.add(new StreamRecord<>(3, initialTime + 5));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
@Test
public void testOpenClose() throws Exception {
- StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new TestOpenCloseReduceFunction(), new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- });
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+ StreamGroupedReduce<Integer> operator =
+ new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), new IntegerKeySelector(), typeInfo);
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
long initialTime = 0L;
testHarness.open();
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime));
+ testHarness.processElement(new StreamRecord<>(1, initialTime));
+ testHarness.processElement(new StreamRecord<>(2, initialTime));
testHarness.close();
@@ -149,4 +127,28 @@ public class StreamGroupedReduceTest {
return in1 + in2;
}
}
+
+ // Utilities
+
+ private static class MyReducer implements ReduceFunction<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+
+ }
+
+ private static class IntegerKeySelector implements KeySelector<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ }
+
+ private static TypeInformation<Integer> typeInfo = TypeExtractor.getForClass(Integer.class);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 18b71be..0ce36aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -273,7 +273,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
javaStream.getExecutionConfig)
}
- val invokable = new StreamGroupedReduce[T](reducer,javaStream.getKeySelector())
+ val invokable = new StreamGroupedReduce[T](reducer,javaStream.getKeySelector(),getType())
new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
.asInstanceOf[DataStream[T]]
http://git-wip-us.apache.org/repos/asf/flink/blob/c414ea98/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 263bef1..cb02d2f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -19,22 +19,20 @@
package org.apache.flink.test.checkpointing;
import com.google.common.collect.EvictingQueue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.util.Collector;
import org.junit.Assert;
import java.util.Queue;
@@ -47,7 +45,7 @@ import java.util.Random;
*
* <p>
* The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
- * operator.
+ * and the {@link StreamGroupedFold} operators.
*/
@SuppressWarnings("serial")
public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
@@ -63,8 +61,8 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
public void testProgram(StreamExecutionEnvironment env) {
// base stream
- GroupedDataStream<Tuple2<Integer, Long>> stream = env.addSource(new StatefulMultipleSequence())
- .groupBy(0);
+ KeyedStream<Tuple2<Integer, Long>, Tuple> stream = env.addSource(new StatefulMultipleSequence())
+ .keyBy(0);
stream
@@ -72,7 +70,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
.min(1)
// failure generation
.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
- .groupBy(0)
+ .keyBy(0)
.addSink(new MinEvictingQueueSink());
stream
@@ -84,7 +82,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
- .groupBy(0)
+ .keyBy(0)
.addSink(new SumEvictingQueueSink());
stream
@@ -96,7 +94,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
return Tuple2.of(value.f0, accumulator.f1 + value.f1);
}
})
- .groupBy(0)
+ .keyBy(0)
.addSink(new FoldEvictingQueueSink());
}
[2/5] flink git commit: [FLINK-2283] [streaming] Test for
checkpointing in internal operators
Posted by mb...@apache.org.
[FLINK-2283] [streaming] Test for checkpointing in internal operators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76d9638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76d9638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76d9638
Branch: refs/heads/master
Commit: a76d9638e83947d491e6ae1edb82f020b6ba8f54
Parents: 225704b
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 20 22:27:11 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200
----------------------------------------------------------------------
.../api/operators/StreamGroupedFold.java | 1 -
.../api/operators/StreamGroupedReduce.java | 1 -
.../UdfStreamOperatorCheckpointingITCase.java | 267 +++++++++++++++++++
3 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f8f167a..ad40b85 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -23,7 +23,6 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index e1f9f06..68ebd8f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators;
import java.util.HashMap;
-import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
new file mode 100644
index 0000000..263bef1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Integration test ensuring that the persistent state defined by the implementations
+ * of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
+ * a failure.
+ *
+ * <p>
+ * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
+ * operator.
+ */
+@SuppressWarnings("serial")
+public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
+
+ final private static long NUM_INPUT = 2_500_000L;
+ final private static int NUM_OUTPUT = 1_000;
+
+ /**
+ * Assembles a stream of a grouping field and some long data. Applies reduce functions
+ * on this stream.
+ */
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+
+ // base stream
+ GroupedDataStream<Tuple2<Integer, Long>> stream = env.addSource(new StatefulMultipleSequence())
+ .groupBy(0);
+
+
+ stream
+ // testing built-in aggregate
+ .min(1)
+ // failure generation
+ .map(new OnceFailingIdentityMapFunction(NUM_INPUT))
+ .groupBy(0)
+ .addSink(new MinEvictingQueueSink());
+
+ stream
+ // testing UDF reducer
+ .reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> reduce(
+ Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
+ return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+ }
+ })
+ .groupBy(0)
+ .addSink(new SumEvictingQueueSink());
+
+ stream
+ // testing UDF folder
+ .fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> fold(
+ Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
+ return Tuple2.of(value.f0, accumulator.f1 + value.f1);
+ }
+ })
+ .groupBy(0)
+ .addSink(new FoldEvictingQueueSink());
+ }
+
+ @Override
+ public void postSubmit() {
+
+ // Note that these checks depend on the ordering of the input
+
+ // Checking the result of the built-in aggregate
+ for (int i = 0; i < PARALLELISM; i++) {
+ for (Long value : MinEvictingQueueSink.queues[i]) {
+ Assert.assertTrue("Value different from 1 found, was " + value + ".", value == 1);
+ }
+ }
+
+ // Checking the result of the UDF reducer
+ for (int i = 0; i < PARALLELISM; i++) {
+ long prevCount = NUM_INPUT - NUM_OUTPUT;
+ long sum = prevCount * (prevCount + 1) / 2;
+ while (!SumEvictingQueueSink.queues[i].isEmpty()) {
+ sum += ++prevCount;
+ Long value = SumEvictingQueueSink.queues[i].remove();
+ Assert.assertTrue("Unexpected reduce value " + value + " instead of " + sum + ".", value == sum);
+ }
+ }
+
+ // Checking the result of the UDF folder
+ for (int i = 0; i < PARALLELISM; i++) {
+ long prevCount = NUM_INPUT - NUM_OUTPUT;
+ long sum = prevCount * (prevCount + 1) / 2;
+ while (!FoldEvictingQueueSink.queues[i].isEmpty()) {
+ sum += ++prevCount;
+ Long value = FoldEvictingQueueSink.queues[i].remove();
+ Assert.assertTrue("Unexpected fold value " + value + " instead of " + sum + ".", value == sum);
+ }
+ }
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Functions
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Produces a sequence multiple times for each parallelism instance of downstream operators,
+ * augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
+ */
+ private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>{
+
+ private transient OperatorState<Long> count;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ count = getRuntimeContext().getOperatorState("count", 0L, false);
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+ Object lock = ctx.getCheckpointLock();
+
+ while (count.value() < NUM_INPUT){
+ synchronized (lock){
+ for (int i = 0; i < PARALLELISM; i++) {
+ ctx.collect(Tuple2.of(i, count.value() + 1));
+ }
+ count.update(count.value() + 1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
+
+ /**
+ * Mapper that causes one failure between seeing 40% to 70% of the records.
+ */
+ private static class OnceFailingIdentityMapFunction
+ extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+ private static volatile boolean hasFailed = false;
+
+ private final long numElements;
+
+ private long failurePos;
+ private OperatorState<Long> count;
+
+ public OnceFailingIdentityMapFunction(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+ long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+ failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+ count = getRuntimeContext().getOperatorState("count", 0L, false);
+ }
+
+ @Override
+ public Tuple2<Integer, Long> map(Tuple2<Integer, Long> value) throws Exception {
+ if (!hasFailed && count.value() >= failurePos) {
+ hasFailed = true;
+ throw new Exception("Test Failure");
+ }
+ count.update(count.value() + 1);
+ return value;
+ }
+
+ }
+
+ /**
+ * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+ * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+ * parallel access of the queues.
+ */
+ private static class MinEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+ public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+ @Override
+ public void invoke(Tuple2<Integer, Long> value) throws Exception {
+ if (queues[value.f0] == null) {
+ queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+ }
+ queues[value.f0].add(value.f1);
+ }
+ }
+
+ /**
+ * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+ * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+ * parallel access of the queues.
+ */
+ private static class SumEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+ public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+ @Override
+ public void invoke(Tuple2<Integer, Long> value) throws Exception {
+ if (queues[value.f0] == null) {
+ queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+ }
+ queues[value.f0].add(value.f1);
+ }
+ }
+
+ /**
+ * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+ * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+ * parallel access of the queues.
+ */
+ private static class FoldEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+ public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+ @Override
+ public void invoke(Tuple2<Integer, Long> value) throws Exception {
+ if (queues[value.f0] == null) {
+ queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+ }
+ queues[value.f0].add(value.f1);
+ }
+ }
+}
[3/5] flink git commit: [streaming] Removed unused StreamReduce
Posted by mb...@apache.org.
[streaming] Removed unused StreamReduce
Refactored corresponding tests, some minor cleanups.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/906bd6dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/906bd6dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/906bd6dc
Branch: refs/heads/master
Commit: 906bd6dcb360665af0331faddb34e7260bfe7f1a
Parents: 4938ff0
Author: mbalassi <mb...@apache.org>
Authored: Fri Sep 11 16:32:09 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/state/OperatorState.java | 10 +-
.../api/operators/StreamGroupedReduce.java | 11 +-
.../streaming/api/operators/StreamReduce.java | 53 ---
.../streaming/api/AggregationFunctionTest.java | 446 ++++++++-----------
.../flink/streaming/api/scala/DataStream.scala | 26 +-
5 files changed, 221 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 3f5e977..3036023 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
import java.io.IOException;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
/**
* Base interface for all streaming operator states. It can represent both
@@ -30,9 +32,9 @@ import org.apache.flink.api.common.functions.MapFunction;
* State can be accessed and manipulated using the {@link #value()} and
* {@link #update(T)} methods. These calls are only safe in the
* transformation call the operator represents, for instance inside
- * {@link MapFunction#map()} and can lead tp unexpected behavior in the
- * {@link #open(org.apache.flink.configuration.Configuration)} or
- * {@link #close()} methods.
+ * {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
+ * {@link AbstractRichFunction#open(Configuration)} or
+ * {@link AbstractRichFunction#close()} methods.
*
* @param <T>
* Type of the operator state
@@ -59,7 +61,7 @@ public interface OperatorState<T> {
* partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access.
*
- * @param state
+ * @param value
* The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 7533c33..8805138 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,9 +22,11 @@ import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
+ implements OneInputStreamOperator<IN, IN>{
private static final long serialVersionUID = 1L;
@@ -41,7 +43,7 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
Object key = keySelector.getKey(element.getValue());
if (values == null) {
- values = new HashMap<Object, IN>();
+ values = new HashMap<>();
}
IN currentValue = values.get(key);
@@ -56,4 +58,9 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
}
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
deleted file mode 100644
index af562fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
- implements OneInputStreamOperator<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- private transient IN currentValue;
-
- public StreamReduce(ReduceFunction<IN> reducer) {
- super(reducer);
- currentValue = null;
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
-
- if (currentValue != null) {
- currentValue = userFunction.reduce(currentValue, element.getValue());
- } else {
- currentValue = element.getValue();
- }
- output.collect(element.replace(currentValue));
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index ff04609..cdf7aae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -23,18 +23,19 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamReduce;
import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
@@ -44,32 +45,16 @@ public class AggregationFunctionTest {
@Test
public void groupSumIntegerTest() {
- List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Integer> expectedSumList0 = new ArrayList<Integer>();
- List<Integer> expectedMinList0 = new ArrayList<Integer>();
- List<Integer> expectedMaxList0 = new ArrayList<Integer>();
- List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
- List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-
- List<Integer> simpleInput = new ArrayList<Integer>();
+ // preparing expected outputs
+ List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
+ List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
+ List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
- simpleInput.add(i);
- expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
- expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
- expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
- expectedSumList0.add((i + 1) * i / 2);
- expectedMaxList0.add(i);
- expectedMinList0.add(0);
-
int groupedSum;
switch (i % 3) {
case 0:
@@ -83,99 +68,59 @@ public class AggregationFunctionTest {
break;
}
- expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
- expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3));
- expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
+ expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
+ expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
+ expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
}
- TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(0, 0));
- TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
+ // some necessary boiler plate
+ TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
+ .getForObject(new Tuple2<>(0, 0));
ExecutionConfig config = new ExecutionConfig();
- ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
- new SumAggregator<Tuple2<Integer, Integer>>(1, type1, config);
- ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
- ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
- 1, type1, AggregationType.MIN, config);
- ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2,
- AggregationType.MIN, config);
- ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<Tuple2<Integer, Integer>>(
- 1, type1, AggregationType.MAX, config);
- ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2,
- AggregationType.MAX, config);
- List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(sumFunction), getInputList());
-
- List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(minFunction), getInputList());
-
- List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(maxFunction), getInputList());
-
- TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(1, 1));
-
KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[]{0}, typeInfo),
- typeInfo, new ExecutionConfig());
+ new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+ typeInfo, config);
+
+ // aggregations tested
+ ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
+ new SumAggregator<>(1, typeInfo, config);
+ ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
+ 1, typeInfo, AggregationType.MIN, config);
+ ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
+ 1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<Tuple2<Integer, Integer>>(sumFunction, keySelector),
+ new StreamGroupedReduce<>(sumFunction, keySelector),
getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<Tuple2<Integer, Integer>>(minFunction, keySelector),
+ new StreamGroupedReduce<>(minFunction, keySelector),
getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<Tuple2<Integer, Integer>>(maxFunction, keySelector),
+ new StreamGroupedReduce<>(maxFunction, keySelector),
getInputList());
- assertEquals(expectedSumList, sumList);
- assertEquals(expectedMinList, minList);
- assertEquals(expectedMaxList, maxList);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
- assertEquals(expectedSumList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(sumFunction0), simpleInput));
- assertEquals(expectedMinList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(minFunction0), simpleInput));
- assertEquals(expectedMaxList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(maxFunction0), simpleInput));
-
}
@Test
public void pojoGroupSumIntegerTest() {
- List<MyPojo> expectedSumList = new ArrayList<MyPojo>();
- List<MyPojo> expectedMinList = new ArrayList<MyPojo>();
- List<MyPojo> expectedMaxList = new ArrayList<MyPojo>();
- List<Integer> expectedSumList0 = new ArrayList<Integer>();
- List<Integer> expectedMinList0 = new ArrayList<Integer>();
- List<Integer> expectedMaxList0 = new ArrayList<Integer>();
- List<MyPojo> expectedGroupSumList = new ArrayList<MyPojo>();
- List<MyPojo> expectedGroupMinList = new ArrayList<MyPojo>();
- List<MyPojo> expectedGroupMaxList = new ArrayList<MyPojo>();
-
- List<Integer> simpleInput = new ArrayList<Integer>();
+
+ // preparing expected outputs
+ List<MyPojo> expectedGroupSumList = new ArrayList<>();
+ List<MyPojo> expectedGroupMinList = new ArrayList<>();
+ List<MyPojo> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
- simpleInput.add(i);
- expectedSumList.add(new MyPojo(i % 3, (i + 1) * i / 2));
- expectedMinList.add(new MyPojo(i % 3, 0));
- expectedMaxList.add(new MyPojo(i % 3, i));
-
- expectedSumList0.add((i + 1) * i / 2);
- expectedMaxList0.add(i);
- expectedMinList0.add(0);
-
int groupedSum;
switch (i % 3) {
case 0:
@@ -194,222 +139,188 @@ public class AggregationFunctionTest {
expectedGroupMaxList.add(new MyPojo(i % 3, i));
}
- TypeInformation<MyPojo> type1 = TypeExtractor.getForObject(new MyPojo(0, 0));
- TypeInformation<Integer> type2 = TypeExtractor.getForObject(0);
- ExecutionConfig config = new ExecutionConfig();
+ // some necessary boiler plate
+ TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
- ReduceFunction<MyPojo> sumFunction = new SumAggregator<MyPojo>("f1", type1, config);
- ReduceFunction<Integer> sumFunction0 = new SumAggregator<Integer>(0, type2, config);
- ReduceFunction<MyPojo> minFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MIN,
- false, config);
- ReduceFunction<Integer> minFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MIN,
- config);
- ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<MyPojo>("f1", type1, AggregationType.MAX,
- false, config);
- ReduceFunction<Integer> maxFunction0 = new ComparableAggregator<Integer>(0, type2, AggregationType.MAX,
- config);
-
- List<MyPojo> sumList = MockContext.createAndExecute(
- new StreamReduce<MyPojo>(sumFunction), getInputPojoList());
- List<MyPojo> minList = MockContext.createAndExecute(
- new StreamReduce<MyPojo>(minFunction), getInputPojoList());
- List<MyPojo> maxList = MockContext.createAndExecute(
- new StreamReduce<MyPojo>(maxFunction), getInputPojoList());
+ ExecutionConfig config = new ExecutionConfig();
- TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<MyPojo>(new String[]{"f0"}, typeInfo),
+ new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
+ // aggregations tested
+ ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
+ ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
+ false, config);
+ ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
+ false, config);
+
List<MyPojo> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<MyPojo>(sumFunction, keySelector),
+ new StreamGroupedReduce<>(sumFunction, keySelector),
getInputPojoList());
List<MyPojo> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<MyPojo>(minFunction, keySelector),
+ new StreamGroupedReduce<>(minFunction, keySelector),
getInputPojoList());
List<MyPojo> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<MyPojo>(maxFunction, keySelector),
+ new StreamGroupedReduce<>(maxFunction, keySelector),
getInputPojoList());
- assertEquals(expectedSumList, sumList);
- assertEquals(expectedMinList, minList);
- assertEquals(expectedMaxList, maxList);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
- assertEquals(expectedSumList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(sumFunction0), simpleInput));
- assertEquals(expectedMinList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(minFunction0), simpleInput));
- assertEquals(expectedMaxList0, MockContext.createAndExecute(
- new StreamReduce<Integer>(maxFunction0), simpleInput));
}
-
+
@Test
public void minMaxByTest() {
- TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(0, 0));
+ // Tuples are grouped on field 0, aggregated on field 1
+
+ // preparing expected outputs
+ List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
+ Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
+ Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
+ Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
+
+ List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
+ Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 5),
+ Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
+
+ List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
+ Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+ Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
+ Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
+
+ List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
+ Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
+ Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3),
+ Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6));
+
+ // some necessary boiler plate
+ TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
+ .getForObject(Tuple3.of(0,0,0));
ExecutionConfig config = new ExecutionConfig();
- ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, true, config);
- ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MAXBY, false, config);
-
- ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, true, config);
- ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast =
- new ComparableAggregator<Tuple2<Integer, Integer>>(0, type1, AggregationType.MINBY, false, config);
-
- List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-
- List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
- maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
- maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
-
- List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-
- List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
+ KeySelector<Tuple3<Integer, Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
+ typeInfo, config);
+
+ // aggregations tested
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, true, config);
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionLast =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, false, config);
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionFirst =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, true, config);
+ ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
+ new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionFirst),
- getInputList()));
+ new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+ getInputByList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionLast),
- getInputList()));
+ new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+ getInputByList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionLast),
- getInputList()));
+ new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+ getInputByList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionFirst),
- getInputList()));
+ new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+ getInputByList()));
}
@Test
public void pojoMinMaxByTest() {
+ // Pojos are grouped on field 0, aggregated on field 1
+
+ // preparing expected outputs
+ List<MyPojo3> maxByFirstExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+ new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2),
+ new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2));
+
+ List<MyPojo3> maxByLastExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
+ new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 5),
+ new MyPojo3(2, 5), new MyPojo3(2, 5), new MyPojo3(2, 8));
+
+ List<MyPojo3> minByFirstExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0));
+
+ List<MyPojo3> minByLastExpected = ImmutableList.of(
+ new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
+ new MyPojo3(0, 3), new MyPojo3(0, 3), new MyPojo3(0, 3),
+ new MyPojo3(0, 6), new MyPojo3(0, 6), new MyPojo3(0, 6));
+
+ // some necessary boiler plate
+ TypeInformation<MyPojo3> typeInfo = TypeExtractor.getForObject(new MyPojo3(0, 0));
+
ExecutionConfig config = new ExecutionConfig();
- TypeInformation<MyPojo> type1 = TypeExtractor
- .getForObject(new MyPojo(0, 0));
-
- ReduceFunction<MyPojo> maxByFunctionFirst =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, true, config);
- ReduceFunction<MyPojo> maxByFunctionLast =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MAXBY, false, config);
-
- ReduceFunction<MyPojo> minByFunctionFirst =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, true, config);
- ReduceFunction<MyPojo> minByFunctionLast =
- new ComparableAggregator<MyPojo>("f0", type1, AggregationType.MINBY, false, config);
-
- List<MyPojo> maxByFirstExpected = new ArrayList<MyPojo>();
- maxByFirstExpected.add(new MyPojo(0, 0));
- maxByFirstExpected.add(new MyPojo(1, 1));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
- maxByFirstExpected.add(new MyPojo(2, 2));
-
- List<MyPojo> maxByLastExpected = new ArrayList<MyPojo>();
- maxByLastExpected.add(new MyPojo(0, 0));
- maxByLastExpected.add(new MyPojo(1, 1));
- maxByLastExpected.add(new MyPojo(2, 2));
- maxByLastExpected.add(new MyPojo(2, 2));
- maxByLastExpected.add(new MyPojo(2, 2));
- maxByLastExpected.add(new MyPojo(2, 5));
- maxByLastExpected.add(new MyPojo(2, 5));
- maxByLastExpected.add(new MyPojo(2, 5));
- maxByLastExpected.add(new MyPojo(2, 8));
-
- List<MyPojo> minByFirstExpected = new ArrayList<MyPojo>();
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
- minByFirstExpected.add(new MyPojo(0, 0));
-
- List<MyPojo> minByLastExpected = new ArrayList<MyPojo>();
- minByLastExpected.add(new MyPojo(0, 0));
- minByLastExpected.add(new MyPojo(0, 0));
- minByLastExpected.add(new MyPojo(0, 0));
- minByLastExpected.add(new MyPojo(0, 3));
- minByLastExpected.add(new MyPojo(0, 3));
- minByLastExpected.add(new MyPojo(0, 3));
- minByLastExpected.add(new MyPojo(0, 6));
- minByLastExpected.add(new MyPojo(0, 6));
- minByLastExpected.add(new MyPojo(0, 6));
+
+ KeySelector<MyPojo3, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
+ typeInfo, config);
+
+ // aggregations tested
+ ReduceFunction<MyPojo3> maxByFunctionFirst =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, true, config);
+ ReduceFunction<MyPojo3> maxByFunctionLast =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, false, config);
+ ReduceFunction<MyPojo3> minByFunctionFirst =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, true, config);
+ ReduceFunction<MyPojo3> minByFunctionLast =
+ new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(maxByFunctionFirst),
- getInputPojoList()));
+ new StreamGroupedReduce<>(maxByFunctionFirst, keySelector),
+ getInputByPojoList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(maxByFunctionLast),
- getInputPojoList()));
+ new StreamGroupedReduce<>(maxByFunctionLast, keySelector),
+ getInputByPojoList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(minByFunctionLast),
- getInputPojoList()));
+ new StreamGroupedReduce<>(minByFunctionLast, keySelector),
+ getInputByPojoList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduce<MyPojo>(minByFunctionFirst),
- getInputPojoList()));
+ new StreamGroupedReduce<>(minByFunctionFirst, keySelector),
+ getInputByPojoList()));
}
+ // *************************************************************************
+ // UTILS
+ // *************************************************************************
+
private List<Tuple2<Integer, Integer>> getInputList() {
- ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
+ ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<>();
for (int i = 0; i < 9; i++) {
- inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
+ inputList.add(Tuple2.of(i % 3, i));
}
return inputList;
-
}
private List<MyPojo> getInputPojoList() {
- ArrayList<MyPojo> inputList = new ArrayList<MyPojo>();
+ ArrayList<MyPojo> inputList = new ArrayList<>();
for (int i = 0; i < 9; i++) {
inputList.add(new MyPojo(i % 3, i));
}
return inputList;
+ }
+ private List<Tuple3<Integer, Integer, Integer>> getInputByList() {
+ ArrayList<Tuple3<Integer, Integer, Integer>> inputList = new ArrayList<>();
+ for (int i = 0; i < 9; i++) {
+ inputList.add(Tuple3.of(0, i % 3, i));
+ }
+ return inputList;
+ }
+
+ private List<MyPojo3> getInputByPojoList() {
+ ArrayList<MyPojo3> inputList = new ArrayList<>();
+ for (int i = 0; i < 9; i++) {
+ inputList.add(new MyPojo3(i % 3, i));
+ }
+ return inputList;
}
public static class MyPojo implements Serializable {
@@ -439,6 +350,39 @@ public class AggregationFunctionTest {
return false;
}
}
+ }
+
+ public static class MyPojo3 implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ public int f0;
+ public int f1;
+ public int f2;
+
+ // Field 0 is always initialized to 0
+ public MyPojo3(int f1, int f2) {
+ this.f1 = f1;
+ this.f2 = f2;
+ }
+
+ public MyPojo3() {
+ }
+
+ @Override
+ public String toString() {
+ return "POJO3(" + f0 + "," + f1 + "," + f2 + ")";
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof MyPojo3) {
+ return this.f0 == ((MyPojo3) other).f0
+ && this.f1 == ((MyPojo3) other).f1
+ && this.f2 == ((MyPojo3) other).f2;
+ } else {
+ return false;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/906bd6dc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0cf1df8..19bcb73 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,32 +18,28 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
-import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
-import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.scala.function.StatefulFunction
+import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
+import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime}
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
-import org.apache.flink.streaming.api.datastream.{KeyedStream => JavaKeyedStream}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
class DataStream[T](javaStream: JavaStream[T]) {
[5/5] flink git commit: [FLINK-2812] [streaming] KeySelectorUtil
interacts well with type extraction
Posted by mb...@apache.org.
[FLINK-2812] [streaming] KeySelectorUtil interacts well with type extraction
The interaction is tested in the AggregationFunctionTest and the scala DataStreamTest amongst others.
Closes #1155
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e494c279
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e494c279
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e494c279
Branch: refs/heads/master
Commit: e494c2795252a7f3db3659b1919cc7c75fc3dbb9
Parents: c414ea9
Author: mbalassi <mb...@apache.org>
Authored: Mon Oct 5 22:19:53 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 17:10:27 2015 +0200
----------------------------------------------------------------------
.../streaming/util/keys/KeySelectorUtil.java | 20 +++++++++++++++-----
.../flink/streaming/api/scala/DataStream.scala | 10 +++++++---
2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e494c279/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index d8839a0..9c76d95 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
/**
* Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
@@ -47,12 +49,14 @@ public final class KeySelectorUtil {
// use ascending order here, the code paths for that are usually a slight bit faster
boolean[] orders = new boolean[numKeyFields];
+ TypeInformation[] typeInfos = new TypeInformation[numKeyFields];
for (int i = 0; i < numKeyFields; i++) {
orders[i] = true;
+ typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
}
-
+
TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
- return new ComparableKeySelector<X>(comparator, numKeyFields);
+ return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
}
@@ -70,7 +74,7 @@ public final class KeySelectorUtil {
TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
logicalKeyPositions, new boolean[1], 0, executionConfig);
- return new OneKeySelector<X, K>(comparator);
+ return new OneKeySelector<>(comparator);
}
/**
@@ -111,21 +115,23 @@ public final class KeySelectorUtil {
*
* @param <IN> The type from which the key is extracted.
*/
- public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
+ public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
private static final long serialVersionUID = 1L;
private final TypeComparator<IN> comparator;
private final int keyLength;
+ private final TupleTypeInfo tupleTypeInfo;
/** Reusable array to hold the key objects. Since this is initially empty (all positions
* are null), it does not have any serialization problems */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Object[] keyArray;
- public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength) {
+ public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo tupleTypeInfo) {
this.comparator = comparator;
this.keyLength = keyLength;
+ this.tupleTypeInfo = tupleTypeInfo;
keyArray = new Object[keyLength];
}
@@ -139,6 +145,10 @@ public final class KeySelectorUtil {
return key;
}
+ @Override
+ public TypeInformation<Tuple> getProducedType() {
+ return tupleTypeInfo;
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e494c279/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 19bcb73..8aeacb4 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
@@ -230,8 +231,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
- val keyExtractor = new KeySelector[T, K] {
+ val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
+ override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.keyBy(keyExtractor)
}
@@ -256,8 +258,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
val cleanFun = clean(fun)
- val keyExtractor = new KeySelector[T, K] {
+ val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
+ override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.partitionByHash(keyExtractor)
}
@@ -293,8 +296,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
: DataStream[T] = {
val cleanFun = clean(fun)
- val keyExtractor = new KeySelector[T, K] {
+ val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
+ override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.partitionCustom(partitioner, keyExtractor)
}
[4/5] flink git commit: [FLINK-2283] [streaming] grouped reduce and
fold operators checkpoint state
Posted by mb...@apache.org.
[FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/225704bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/225704bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/225704bc
Branch: refs/heads/master
Commit: 225704bc9912536042027fca1a9880beba3bc2bb
Parents: 906bd6d
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 13 08:19:07 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200
----------------------------------------------------------------------
.../streaming/api/operators/StreamFold.java | 99 --------------------
.../api/operators/StreamGroupedFold.java | 72 ++++++++++++--
.../api/operators/StreamGroupedReduce.java | 21 +++--
3 files changed, 77 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
deleted file mode 100644
index 81115f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class StreamFold<IN, OUT>
- extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- protected transient OUT accumulator;
- private byte[] serializedInitialValue;
-
- protected TypeSerializer<OUT> outTypeSerializer;
-
- public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
- super(folder);
- this.accumulator = initialValue;
- this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- accumulator = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
- output.collect(element.replace(accumulator));
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- super.open(config);
-
- if (serializedInitialValue == null) {
- throw new RuntimeException("No initial value was serialized for the fold " +
- "operator. Probably the setOutputType method was not called.");
- }
-
- ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
- InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
- new DataInputStream(bais)
- );
-
- accumulator = outTypeSerializer.deserialize(in);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
- @Override
- public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
- outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
- new DataOutputStream(baos)
- );
-
- try {
- outTypeSerializer.serialize(accumulator, out);
- } catch (IOException ioe) {
- throw new RuntimeException("Unable to serialize initial value of type " +
- accumulator.getClass().getSimpleName() + " of fold operator.", ioe);
- }
-
- serializedInitialValue = baos.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f4e44c6..f8f167a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -17,50 +17,106 @@
package org.apache.flink.streaming.api.operators;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
+ // Grouped values
private KeySelector<IN, ?> keySelector;
- private transient Map<Object, OUT> values;
+ private transient OperatorState<HashMap<Object, OUT>> values;
+
+ // Initial value serialization
+ private byte[] serializedInitialValue;
+ private TypeSerializer<OUT> outTypeSerializer;
+ private transient OUT initialValue;
public StreamGroupedFold(
FoldFunction<IN, OUT> folder,
KeySelector<IN, ?> keySelector,
OUT initialValue) {
- super(folder, initialValue);
+ super(folder);
this.keySelector = keySelector;
+ this.initialValue = initialValue;
}
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
- values = new HashMap<Object, OUT>();
+ if (serializedInitialValue == null) {
+ throw new RuntimeException("No initial value was serialized for the fold " +
+ "operator. Probably the setOutputType method was not called.");
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+ InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+ new DataInputStream(bais)
+ );
+ initialValue = outTypeSerializer.deserialize(in);
+
+ values = runtimeContext.getOperatorState("flink_internal_fold_values",
+ new HashMap<Object, OUT>(), false);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());
- OUT value = values.get(key);
+ OUT value = values.value().get(key);
if (value != null) {
OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
- values.put(key, folded);
+ values.value().put(key, folded);
output.collect(element.replace(folded));
} else {
- OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
- values.put(key, first);
+ OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
+ values.value().put(key, first);
output.collect(element.replace(first));
}
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+ outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
+ new DataOutputStream(baos)
+ );
+
+ try {
+ outTypeSerializer.serialize(initialValue, out);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to serialize initial value of type " +
+ initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
+ }
+
+ serializedInitialValue = baos.toByteArray();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/225704bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 8805138..e1f9f06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -31,7 +33,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector;
- private transient Map<Object, IN> values;
+ private transient OperatorState<HashMap<Object, IN>> values;
public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
super(reducer);
@@ -39,21 +41,24 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ values = runtimeContext.getOperatorState("flink_internal_reduce_values",
+ new HashMap<Object, IN>(), false);
+ }
+
+ @Override
public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());
- if (values == null) {
- values = new HashMap<>();
- }
-
- IN currentValue = values.get(key);
+ IN currentValue = values.value().get(key);
if (currentValue != null) {
// TODO: find a way to let operators copy elements (maybe)
IN reduced = userFunction.reduce(currentValue, element.getValue());
- values.put(key, reduced);
+ values.value().put(key, reduced);
output.collect(element.replace(reduced));
} else {
- values.put(key, element.getValue());
+ values.value().put(key, element.getValue());
output.collect(element.replace(element.getValue()));
}
}