You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/02/20 17:17:18 UTC

[flink] branch release-1.10 updated: [FLINK-15904][connectors/kafka] Use explicit Serializer for KafkaConsumer unionOffsetStates

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new cf448ae  [FLINK-15904][connectors/kafka] Use explicit Serializer for KafkaConsumer unionOffsetStates
cf448ae is described below

commit cf448ae117f9615c38be075265f6a5572490fcd5
Author: Oleksandr Nitavskyi <o....@criteo.com>
AuthorDate: Wed Feb 19 11:10:03 2020 +0100

    [FLINK-15904][connectors/kafka] Use explicit Serializer for KafkaConsumer unionOffsetStates
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   | 27 ++++++++++++---
 .../kafka/FlinkKafkaConsumerBaseTest.java          | 38 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 712af52..bc4c975 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -23,11 +23,14 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
@@ -857,9 +860,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
 			stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
-		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
-				OFFSETS_STATE_NAME,
-				TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
+		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
+			createStateSerializer(getRuntimeContext().getExecutionConfig())));
 
 		if (context.isRestored() && !restoredFromOldState) {
 			restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
@@ -1063,4 +1065,21 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	LinkedMap getPendingOffsetsToCommit() {
 		return pendingOffsetsToCommit;
 	}
+
+	/**
+	 * Creates state serializer for kafka topic partition to offset tuple.
+	 * Using of the explicit state serializer with KryoSerializer is needed because otherwise
+	 * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
+	 */
+	@VisibleForTesting
+	static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer(ExecutionConfig executionConfig) {
+		// explicit serializer will keep the compatibility with GenericTypeInformation and allow to disableGenericTypes for users
+		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
+			new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
+			LongSerializer.INSTANCE
+		};
+		@SuppressWarnings("unchecked")
+		Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
+		return new TupleSerializer<>(tupleClass, fieldSerializers);
+	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b578dac..f8f769c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -54,6 +59,7 @@ import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -666,6 +672,38 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 			1);
 	}
 
+	/**
+	 * Before using an explicit TypeSerializer for the partition state the {@link
+	 * FlinkKafkaConsumerBase} was creating a serializer using a {@link TypeHint}. Here, we verify
+	 * that the two methods create compatible serializers.
+	 */
+	@Test
+	public void testExplicitStateSerializerCompatibility() throws Exception {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+
+		Tuple2<KafkaTopicPartition, Long> tuple =
+				new Tuple2<>(new KafkaTopicPartition("dummy", 0), 42L);
+
+		// This is how the KafkaConsumerBase used to create the TypeSerializer
+		TypeInformation<Tuple2<KafkaTopicPartition, Long>> originalTypeHintTypeInfo =
+				new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}.getTypeInfo();
+		TypeSerializer<Tuple2<KafkaTopicPartition, Long>> serializerFromTypeHint =
+				originalTypeHintTypeInfo.createSerializer(executionConfig);
+		byte[] bytes = InstantiationUtil.serializeToByteArray(serializerFromTypeHint, tuple);
+
+		// Directly use the Consumer to create the TypeSerializer (using the new method)
+		TupleSerializer<Tuple2<KafkaTopicPartition, Long>> kafkaConsumerSerializer =
+				FlinkKafkaConsumerBase.createStateSerializer(executionConfig);
+		Tuple2<KafkaTopicPartition, Long> actualTuple =
+				InstantiationUtil.deserializeFromByteArray(kafkaConsumerSerializer, bytes);
+
+		Assert.assertEquals(
+				"Explicit Serializer is not compatible with previous method of creating Serializer using TypeHint.",
+				tuple,
+				actualTuple
+		);
+	}
+
 	@Test
 	public void testScaleUp() throws Exception {
 		testRescaling(5, 2, 8, 30);