You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/02/20 17:17:18 UTC
[flink] branch release-1.10 updated:
[FLINK-15904][connectors/kafka] Use explicit Serializer for KafkaConsumer
unionOffsetStates
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new cf448ae [FLINK-15904][connectors/kafka] Use explicit Serializer for KafkaConsumer unionOffsetStates
cf448ae is described below
commit cf448ae117f9615c38be075265f6a5572490fcd5
Author: Oleksandr Nitavskyi <o....@criteo.com>
AuthorDate: Wed Feb 19 11:10:03 2020 +0100
[FLINK-15904][connectors/kafka] Use explicit Serializer for KafkaConsumer unionOffsetStates
---
.../connectors/kafka/FlinkKafkaConsumerBase.java | 27 ++++++++++++---
.../kafka/FlinkKafkaConsumerBaseTest.java | 38 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 712af52..bc4c975 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -23,11 +23,14 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
@@ -857,9 +860,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
- this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
- OFFSETS_STATE_NAME,
- TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
+ this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
+ createStateSerializer(getRuntimeContext().getExecutionConfig())));
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
@@ -1063,4 +1065,21 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LinkedMap getPendingOffsetsToCommit() {
return pendingOffsetsToCommit;
}
+
+ /**
+ * Creates state serializer for kafka topic partition to offset tuple.
+ * Using of the explicit state serializer with KryoSerializer is needed because otherwise
+ * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
+ */
+ @VisibleForTesting
+ static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer(ExecutionConfig executionConfig) {
+ // explicit serializer will keep the compatibility with GenericTypeInformation and allow to disableGenericTypes for users
+ TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
+ new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
+ LongSerializer.INSTANCE
+ };
+ @SuppressWarnings("unchecked")
+ Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
+ return new TupleSerializer<>(tupleClass, fieldSerializers);
+ }
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b578dac..f8f769c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -18,13 +18,18 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -54,6 +59,7 @@ import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -666,6 +672,38 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
1);
}
+ /**
+ * Before using an explicit TypeSerializer for the partition state the {@link
+ * FlinkKafkaConsumerBase} was creating a serializer using a {@link TypeHint}. Here, we verify
+ * that the two methods create compatible serializers.
+ */
+ @Test
+ public void testExplicitStateSerializerCompatibility() throws Exception {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+
+ Tuple2<KafkaTopicPartition, Long> tuple =
+ new Tuple2<>(new KafkaTopicPartition("dummy", 0), 42L);
+
+ // This is how the KafkaConsumerBase used to create the TypeSerializer
+ TypeInformation<Tuple2<KafkaTopicPartition, Long>> originalTypeHintTypeInfo =
+ new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}.getTypeInfo();
+ TypeSerializer<Tuple2<KafkaTopicPartition, Long>> serializerFromTypeHint =
+ originalTypeHintTypeInfo.createSerializer(executionConfig);
+ byte[] bytes = InstantiationUtil.serializeToByteArray(serializerFromTypeHint, tuple);
+
+ // Directly use the Consumer to create the TypeSerializer (using the new method)
+ TupleSerializer<Tuple2<KafkaTopicPartition, Long>> kafkaConsumerSerializer =
+ FlinkKafkaConsumerBase.createStateSerializer(executionConfig);
+ Tuple2<KafkaTopicPartition, Long> actualTuple =
+ InstantiationUtil.deserializeFromByteArray(kafkaConsumerSerializer, bytes);
+
+ Assert.assertEquals(
+ "Explicit Serializer is not compatible with previous method of creating Serializer using TypeHint.",
+ tuple,
+ actualTuple
+ );
+ }
+
@Test
public void testScaleUp() throws Exception {
testRescaling(5, 2, 8, 30);