You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/18 12:58:44 UTC

[flink] 03/05: [FLINK-15670][connector] Adds the consumer for KafkaShuffle.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3b42fb924d571c6a88223411b414969bf8b89d4
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 18 16:57:56 2020 +0800

    [FLINK-15670][connector] Adds the consumer for KafkaShuffle.
    
    KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |   2 +-
 .../kafka/internals/AbstractFetcher.java           |   2 +-
 .../connectors/kafka/internal/KafkaFetcher.java    |  55 ++--
 .../kafka/internal/KafkaShuffleFetcher.java        | 297 +++++++++++++++++++++
 .../kafka/shuffle/FlinkKafkaShuffleConsumer.java   |  94 +++++++
 5 files changed, 424 insertions(+), 26 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 46688f6..f9f835a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -264,7 +264,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * @param properties - Kafka configuration properties to be adjusted
 	 * @param offsetCommitMode offset commit mode
 	 */
-	static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
+	protected static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
 		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
 			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 		}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 9ad685c..978cd97 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -84,7 +84,7 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/** The lock that guarantees that record emission and state updates are atomic,
 	 * from the view of taking a checkpoint. */
-	private final Object checkpointLock;
+	protected final Object checkpointLock;
 
 	/** All partitions (and their state) that this fetcher is subscribed to. */
 	private final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates;
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
index d591f58..d2be85e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
@@ -63,14 +63,17 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	/** The schema to convert between Kafka's byte messages, and Flink's objects. */
 	private final KafkaDeserializationSchema<T> deserializer;
 
+	/** A collector to emit records in batch (bundle). **/
+	private final KafkaCollector kafkaCollector;
+
 	/** The handover of data and exceptions between the consumer thread and the task thread. */
-	private final Handover handover;
+	final Handover handover;
 
 	/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
-	private final KafkaConsumerThread consumerThread;
+	final KafkaConsumerThread consumerThread;
 
 	/** Flag to mark the main work loop as alive. */
-	private volatile boolean running = true;
+	volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
 
@@ -111,19 +114,16 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
 			useMetrics,
 			consumerMetricGroup,
 			subtaskMetricGroup);
+		this.kafkaCollector = new KafkaCollector();
 	}
 
 	// ------------------------------------------------------------------------
 	//  Fetcher work methods
 	// ------------------------------------------------------------------------
 
-	private final KafkaCollector kafkaCollector = new KafkaCollector();
-
 	@Override
 	public void runFetchLoop() throws Exception {
 		try {
-			final Handover handover = this.handover;
-
 			// kick off the actual Kafka consumer
 			consumerThread.start();
 
@@ -138,23 +138,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
 					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
 						records.records(partition.getKafkaPartitionHandle());
 
-					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-						deserializer.deserialize(record, kafkaCollector);
-
-						// emit the actual records. this also updates offset state atomically and emits
-						// watermarks
-						emitRecordsWithTimestamps(
-							kafkaCollector.getRecords(),
-							partition,
-							record.offset(),
-							record.timestamp());
-
-						if (kafkaCollector.isEndOfStreamSignalled()) {
-							// end of stream signaled
-							running = false;
-							break;
-						}
-					}
+					partitionConsumerRecordsHandler(partitionRecords, partition);
 				}
 			}
 		}
@@ -189,6 +173,29 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
 		return "Kafka Fetcher";
 	}
 
+	protected void partitionConsumerRecordsHandler(
+			List<ConsumerRecord<byte[], byte[]>> partitionRecords,
+			KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
+
+		for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+			deserializer.deserialize(record, kafkaCollector);
+
+			// emit the actual records. this also updates offset state atomically and emits
+			// watermarks
+			emitRecordsWithTimestamps(
+				kafkaCollector.getRecords(),
+				partition,
+				record.offset(),
+				record.timestamp());
+
+			if (kafkaCollector.isEndOfStreamSignalled()) {
+				// end of stream signaled
+				running = false;
+				break;
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Implement Methods of the AbstractFetcher
 	// ------------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
new file mode 100644
index 0000000..5d380da
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends KafkaFetcher<T> {
+	/** The handler to check and generate watermarks from fetched records. **/
+	private final WatermarkHandler watermarkHandler;
+
+	/** The schema to convert between Kafka's byte messages, and Flink's objects. */
+	private final KafkaShuffleElementDeserializer kafkaShuffleDeserializer;
+
+	public KafkaShuffleFetcher(
+			SourceFunction.SourceContext<T> sourceContext,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+			SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+			ProcessingTimeService processingTimeProvider,
+			long autoWatermarkInterval,
+			ClassLoader userCodeClassLoader,
+			String taskNameWithSubtasks,
+			KafkaDeserializationSchema<T> deserializer,
+			Properties kafkaProperties,
+			long pollTimeout,
+			MetricGroup subtaskMetricGroup,
+			MetricGroup consumerMetricGroup,
+			boolean useMetrics,
+			TypeSerializer<T> typeSerializer,
+			int producerParallelism) throws Exception {
+		super(
+			sourceContext,
+			assignedPartitionsWithInitialOffsets,
+			watermarkStrategy,
+			processingTimeProvider,
+			autoWatermarkInterval,
+			userCodeClassLoader,
+			taskNameWithSubtasks,
+			deserializer,
+			kafkaProperties,
+			pollTimeout,
+			subtaskMetricGroup,
+			consumerMetricGroup,
+			useMetrics);
+
+		this.kafkaShuffleDeserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
+		this.watermarkHandler = new WatermarkHandler(producerParallelism);
+	}
+
+	@Override
+	protected String getFetcherName() {
+		return "Kafka Shuffle Fetcher";
+	}
+
+	@Override
+	protected void partitionConsumerRecordsHandler(
+			List<ConsumerRecord<byte[], byte[]>> partitionRecords,
+			KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
+
+		for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+			final KafkaShuffleElement element = kafkaShuffleDeserializer.deserialize(record);
+
+			// TODO: Do we need to check the end of stream if reaching the end watermark
+			// TODO: Currently, if one of the partition sends an end-of-stream signal the fetcher stops running.
+			// The current "ending of stream" logic in KafkaFetcher a bit strange: if any partition has a record
+			// signaled as "END_OF_STREAM", the fetcher will stop running. Notice that the signal is coming from
+			// the deserializer, which means from Kafka data itself. But it is possible that other topics
+			// and partitions still have data to read. Finishing reading Partition0 can not guarantee that Partition1
+			// also finishes.
+			if (element.isRecord()) {
+				// timestamp is inherent from upstream
+				// If using ProcessTime, timestamp is going to be ignored (upstream does not include timestamp as well)
+				// If using IngestionTime, timestamp is going to be overwritten
+				// If using EventTime, timestamp is going to be used
+				synchronized (checkpointLock) {
+					KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+					sourceContext.collectWithTimestamp(
+						elementAsRecord.value,
+						elementAsRecord.timestamp == null ? record.timestamp() : elementAsRecord.timestamp);
+					partition.setOffset(record.offset());
+				}
+			} else if (element.isWatermark()) {
+				final KafkaShuffleWatermark watermark = element.asWatermark();
+				Optional<Watermark> newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+				newWatermark.ifPresent(sourceContext::emitWatermark);
+			}
+		}
+	}
+
+	/**
+	 * An element in a KafkaShuffle. Can be a record or a Watermark.
+	 */
+	@VisibleForTesting
+	public abstract static class KafkaShuffleElement {
+
+		public boolean isRecord() {
+			return getClass() == KafkaShuffleRecord.class;
+		}
+
+		public boolean isWatermark() {
+			return getClass() == KafkaShuffleWatermark.class;
+		}
+
+		public <T> KafkaShuffleRecord<T> asRecord() {
+			return (KafkaShuffleRecord<T>) this;
+		}
+
+		public KafkaShuffleWatermark asWatermark() {
+			return (KafkaShuffleWatermark) this;
+		}
+	}
+
+	/**
+	 * A watermark element in a KafkaShuffle. It includes
+	 * - subtask index where the watermark is coming from
+	 * - watermark timestamp
+	 */
+	@VisibleForTesting
+	public static class KafkaShuffleWatermark extends KafkaShuffleElement {
+		final int subtask;
+		final long watermark;
+
+		KafkaShuffleWatermark(int subtask, long watermark) {
+			this.subtask = subtask;
+			this.watermark = watermark;
+		}
+
+		public int getSubtask() {
+			return subtask;
+		}
+
+		public long getWatermark() {
+			return watermark;
+		}
+	}
+
+	/**
+	 * One value with Type T in a KafkaShuffle. This stores the value and an optional associated timestamp.
+	 */
+	@VisibleForTesting
+	public static class KafkaShuffleRecord<T> extends KafkaShuffleElement {
+		final T value;
+		final Long timestamp;
+
+		KafkaShuffleRecord(T value) {
+			this.value = value;
+			this.timestamp = null;
+		}
+
+		KafkaShuffleRecord(long timestamp, T value) {
+			this.value = value;
+			this.timestamp = timestamp;
+		}
+
+		public T getValue() {
+			return value;
+		}
+
+		public Long getTimestamp() {
+			return timestamp;
+		}
+	}
+
+	/**
+	 * Deserializer for KafkaShuffleElement.
+	 */
+	@VisibleForTesting
+	public static class KafkaShuffleElementDeserializer<T> implements Serializable {
+		private static final long serialVersionUID = 1000001L;
+
+		private final TypeSerializer<T> typeSerializer;
+
+		private transient DataInputDeserializer dis;
+
+		@VisibleForTesting
+		public KafkaShuffleElementDeserializer(TypeSerializer<T> typeSerializer) {
+			this.typeSerializer = typeSerializer;
+		}
+
+		@VisibleForTesting
+		public KafkaShuffleElement deserialize(ConsumerRecord<byte[], byte[]> record)
+			throws Exception {
+			byte[] value = record.value();
+
+			if (dis != null) {
+				dis.setBuffer(value);
+			} else {
+				dis = new DataInputDeserializer(value);
+			}
+
+			// version byte
+			ByteSerializer.INSTANCE.deserialize(dis);
+			int tag = ByteSerializer.INSTANCE.deserialize(dis);
+
+			if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+				return new KafkaShuffleRecord<>(typeSerializer.deserialize(dis));
+			} else if (tag == TAG_REC_WITH_TIMESTAMP) {
+				return new KafkaShuffleRecord<>(
+					LongSerializer.INSTANCE.deserialize(dis),
+					typeSerializer.deserialize(dis));
+			} else if (tag == TAG_WATERMARK) {
+				return new KafkaShuffleWatermark(
+					IntSerializer.INSTANCE.deserialize(dis), LongSerializer.INSTANCE.deserialize(dis));
+			}
+
+			throw new UnsupportedOperationException("Unsupported tag format");
+		}
+	}
+
+	/**
+	 * WatermarkHandler to check and generate watermarks from fetched records.
+	 */
+	private static class WatermarkHandler {
+		private final int producerParallelism;
+		private final Map<Integer, Long> subtaskWatermark;
+
+		private long currentMinWatermark = Long.MIN_VALUE;
+
+		WatermarkHandler(int producerParallelism) {
+			this.producerParallelism = producerParallelism;
+			this.subtaskWatermark = new HashMap<>(producerParallelism);
+		}
+
+		private Optional<Watermark> checkAndGetNewWatermark(KafkaShuffleWatermark newWatermark) {
+			// watermarks is incremental for the same partition and PRODUCER subtask
+			Long currentSubTaskWatermark = subtaskWatermark.get(newWatermark.subtask);
+
+			// watermark is strictly increasing
+			Preconditions.checkState(
+				(currentSubTaskWatermark == null) || (currentSubTaskWatermark < newWatermark.watermark),
+				"Watermark should always increase: current : new " + currentSubTaskWatermark + ":" + newWatermark.watermark);
+
+			subtaskWatermark.put(newWatermark.subtask, newWatermark.watermark);
+
+			if (subtaskWatermark.values().size() < producerParallelism) {
+				return Optional.empty();
+			}
+
+			long minWatermark = subtaskWatermark.values().stream().min(Comparator.naturalOrder()).orElse(Long.MIN_VALUE);
+			if (currentMinWatermark < minWatermark) {
+				currentMinWatermark = minWatermark;
+				return Optional.of(new Watermark(minWatermark));
+			} else {
+				return Optional.empty();
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
new file mode 100644
index 0000000..6403f42
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer<T> extends FlinkKafkaConsumer<T> {
+	private final TypeSerializer<T> typeSerializer;
+	private final int producerParallelism;
+
+	FlinkKafkaShuffleConsumer(
+			String topic,
+			TypeInformationSerializationSchema<T> schema,
+			TypeSerializer<T> typeSerializer,
+			Properties props) {
+		// The schema is needed to call the right FlinkKafkaConsumer constructor.
+		// It is never used, can be `null`, but `null` confuses the compiler.
+		super(topic, schema, props);
+		this.typeSerializer = typeSerializer;
+
+		Preconditions.checkArgument(
+			props.getProperty(PRODUCER_PARALLELISM) != null,
+			"Missing producer parallelism for Kafka Shuffle");
+		producerParallelism = PropertiesUtil.getInt(props, PRODUCER_PARALLELISM, Integer.MAX_VALUE);
+	}
+
+	@Override
+	protected AbstractFetcher<T, ?> createFetcher(
+			SourceContext<T> sourceContext,
+			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+			SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+			StreamingRuntimeContext runtimeContext,
+			OffsetCommitMode offsetCommitMode,
+			MetricGroup consumerMetricGroup,
+			boolean useMetrics) throws Exception {
+		// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+		// this overwrites whatever setting the user configured in the properties
+		adjustAutoCommitConfig(properties, offsetCommitMode);
+
+		return new KafkaShuffleFetcher<>(
+			sourceContext,
+			assignedPartitionsWithInitialOffsets,
+			watermarkStrategy,
+			runtimeContext.getProcessingTimeService(),
+			runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+			runtimeContext.getUserCodeClassLoader(),
+			runtimeContext.getTaskNameWithSubtasks(),
+			deserializer,
+			properties,
+			pollTimeout,
+			runtimeContext.getMetricGroup(),
+			consumerMetricGroup,
+			useMetrics,
+			typeSerializer,
+			producerParallelism);
+	}
+}