You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 17:22:51 UTC

[04/12] flink git commit: [FLINK-6808] Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer

[FLINK-6808] Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer

This closes #4052.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f74caf70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f74caf70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f74caf70

Branch: refs/heads/release-1.3
Commit: f74caf7062b1cc23a704f8f8b8171be430b60807
Parents: 1d89dd0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jun 2 15:15:32 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:52:08 2017 +0200

----------------------------------------------------------------------
 .../api/datastream/CoGroupedStreams.java        | 60 ++++++++++++++++++--
 flink-tests/pom.xml                             |  1 +
 .../streaming/runtime/CoGroupJoinITCase.java    | 47 +++++++++++++++
 3 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f74caf70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index f0c3dc2..ba26623 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -30,10 +30,15 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -76,7 +81,7 @@ public class CoGroupedStreams<T1, T2> {
 	private final DataStream<T2> input2;
 
 	/**
-	 * Creates new CoGroped data streams, which are the first step towards building a streaming
+	 * Creates new CoGrouped data streams, which are the first step towards building a streaming
 	 * co-group.
 	 *
 	 * @param input1 The first data stream.
@@ -442,8 +447,7 @@ public class CoGroupedStreams<T1, T2> {
 		private final TypeSerializer<T1> oneSerializer;
 		private final TypeSerializer<T2> twoSerializer;
 
-		public UnionSerializer(TypeSerializer<T1> oneSerializer,
-				TypeSerializer<T2> twoSerializer) {
+		public UnionSerializer(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
 			this.oneSerializer = oneSerializer;
 			this.twoSerializer = twoSerializer;
 		}
@@ -552,12 +556,58 @@ public class CoGroupedStreams<T1, T2> {
 
 		@Override
 		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+			return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
 		}
 
 		@Override
 		public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+			if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
+				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
+					((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+				CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousSerializersAndConfigs.get(0).f0,
+					UnloadableDummyTypeSerializer.class,
+					previousSerializersAndConfigs.get(0).f1,
+					oneSerializer);
+
+				CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+					previousSerializersAndConfigs.get(1).f0,
+					UnloadableDummyTypeSerializer.class,
+					previousSerializersAndConfigs.get(1).f1,
+					twoSerializer);
+
+				if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) {
+					return CompatibilityResult.requiresMigration(
+						new UnionSerializer<>(
+							new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
+							new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
+				}
+			}
+
+			return CompatibilityResult.requiresMigration();
+		}
+	}
+
+	/**
+	 * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
+	 */
+	public static class UnionSerializerConfigSnapshot<T1, T2> extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public UnionSerializerConfigSnapshot() {}
+
+		public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
+			super(oneSerializer, twoSerializer);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f74caf70/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 62feff3..017c213 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -115,6 +115,7 @@ under the License.
 			<artifactId>flink-streaming-java_2.10</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
+			<type>test-jar</type>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/f74caf70/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
index da3de3d..a82b965 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
@@ -19,18 +19,24 @@ package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -324,6 +330,47 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		Assert.assertEquals(expectedResult, testResults);
 	}
 
+	/**
+	 * Verifies that pipelines including {@link CoGroupedStreams} can be checkpointed properly,
+	 * which includes snapshotting configurations of any involved serializers.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6808">FLINK-6808</a>
+	 */
+	@Test
+	public void testCoGroupOperatorWithCheckpoint() throws Exception {
+
+		// generate an operator for the co-group operation
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("a", 0), Tuple2.of("b", 3));
+		DataStream<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 6));
+
+		DataStream<String> coGroupWindow = source1.coGroup(source2)
+			.where(new Tuple2KeyExtractor())
+			.equalTo(new Tuple2KeyExtractor())
+			.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+			.apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+				@Override
+				public void coGroup(Iterable<Tuple2<String, Integer>> first,
+									Iterable<Tuple2<String, Integer>> second,
+									Collector<String> out) throws Exception {
+					out.collect(first + ":" + second);
+				}
+			});
+
+		OneInputTransformation<Tuple2<String, Integer>, String> transform = (OneInputTransformation<Tuple2<String, Integer>, String>) coGroupWindow.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, String> operator = transform.getOperator();
+
+		// wrap the operator in the test harness, and perform a snapshot
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
+			new KeyedOneInputStreamOperatorTestHarness<>(operator, new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+		testHarness.snapshot(0L, 0L);
+	}
+
 	private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
 		
 		@Override