You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 17:22:51 UTC
[04/12] flink git commit: [FLINK-6808] Implement
snapshotConfiguration/ensureCompatibility for
CoGroupedStreams.UnionSerializer
[FLINK-6808] Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer
This closes #4052.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f74caf70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f74caf70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f74caf70
Branch: refs/heads/release-1.3
Commit: f74caf7062b1cc23a704f8f8b8171be430b60807
Parents: 1d89dd0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jun 2 15:15:32 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:52:08 2017 +0200
----------------------------------------------------------------------
.../api/datastream/CoGroupedStreams.java | 60 ++++++++++++++++++--
flink-tests/pom.xml | 1 +
.../streaming/runtime/CoGroupJoinITCase.java | 47 +++++++++++++++
3 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f74caf70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index f0c3dc2..ba26623 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -30,10 +30,15 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -76,7 +81,7 @@ public class CoGroupedStreams<T1, T2> {
private final DataStream<T2> input2;
/**
- * Creates new CoGroped data streams, which are the first step towards building a streaming
+ * Creates new CoGrouped data streams, which are the first step towards building a streaming
* co-group.
*
* @param input1 The first data stream.
@@ -442,8 +447,7 @@ public class CoGroupedStreams<T1, T2> {
private final TypeSerializer<T1> oneSerializer;
private final TypeSerializer<T2> twoSerializer;
- public UnionSerializer(TypeSerializer<T1> oneSerializer,
- TypeSerializer<T2> twoSerializer) {
+ public UnionSerializer(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
this.oneSerializer = oneSerializer;
this.twoSerializer = twoSerializer;
}
@@ -552,12 +556,58 @@ public class CoGroupedStreams<T1, T2> {
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
- throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
}
@Override
public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
+ List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
+ ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+ CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+ previousSerializersAndConfigs.get(0).f0,
+ UnloadableDummyTypeSerializer.class,
+ previousSerializersAndConfigs.get(0).f1,
+ oneSerializer);
+
+ CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+ previousSerializersAndConfigs.get(1).f0,
+ UnloadableDummyTypeSerializer.class,
+ previousSerializersAndConfigs.get(1).f1,
+ twoSerializer);
+
+ if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new UnionSerializer<>(
+ new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
+ new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
+ }
+ }
+
+ return CompatibilityResult.requiresMigration();
+ }
+ }
+
+ /**
+ * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
+ */
+ public static class UnionSerializerConfigSnapshot<T1, T2> extends CompositeTypeSerializerConfigSnapshot {
+
+ private static final int VERSION = 1;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public UnionSerializerConfigSnapshot() {}
+
+ public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
+ super(oneSerializer, twoSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f74caf70/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 62feff3..017c213 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -115,6 +115,7 @@ under the License.
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <type>test-jar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/f74caf70/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
index da3de3d..a82b965 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
@@ -19,18 +19,24 @@ package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -324,6 +330,47 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
Assert.assertEquals(expectedResult, testResults);
}
+ /**
+ * Verifies that pipelines including {@link CoGroupedStreams} can be checkpointed properly,
+ * which includes snapshotting configurations of any involved serializers.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-6808">FLINK-6808</a>
+ */
+ @Test
+ public void testCoGroupOperatorWithCheckpoint() throws Exception {
+
+ // generate an operator for the co-group operation
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("a", 0), Tuple2.of("b", 3));
+ DataStream<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 6));
+
+ DataStream<String> coGroupWindow = source1.coGroup(source2)
+ .where(new Tuple2KeyExtractor())
+ .equalTo(new Tuple2KeyExtractor())
+ .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<String, Integer>> first,
+ Iterable<Tuple2<String, Integer>> second,
+ Collector<String> out) throws Exception {
+ out.collect(first + ":" + second);
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, String> transform = (OneInputTransformation<Tuple2<String, Integer>, String>) coGroupWindow.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, String> operator = transform.getOperator();
+
+ // wrap the operator in the test harness, and perform a snapshot
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
+ testHarness.snapshot(0L, 0L);
+ }
+
private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
@Override