You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/18 12:58:44 UTC
[flink] 03/05: [FLINK-15670][connector] Adds the consumer for
KafkaShuffle.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c3b42fb924d571c6a88223411b414969bf8b89d4
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 18 16:57:56 2020 +0800
[FLINK-15670][connector] Adds the consumer for KafkaShuffle.
KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
---
.../connectors/kafka/FlinkKafkaConsumerBase.java | 2 +-
.../kafka/internals/AbstractFetcher.java | 2 +-
.../connectors/kafka/internal/KafkaFetcher.java | 55 ++--
.../kafka/internal/KafkaShuffleFetcher.java | 297 +++++++++++++++++++++
.../kafka/shuffle/FlinkKafkaShuffleConsumer.java | 94 +++++++
5 files changed, 424 insertions(+), 26 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 46688f6..f9f835a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -264,7 +264,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
- static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
+ protected static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 9ad685c..978cd97 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -84,7 +84,7 @@ public abstract class AbstractFetcher<T, KPH> {
/** The lock that guarantees that record emission and state updates are atomic,
* from the view of taking a checkpoint. */
- private final Object checkpointLock;
+ protected final Object checkpointLock;
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates;
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
index d591f58..d2be85e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
@@ -63,14 +63,17 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
private final KafkaDeserializationSchema<T> deserializer;
+ /** A collector to emit records in batch (bundle). **/
+ private final KafkaCollector kafkaCollector;
+
/** The handover of data and exceptions between the consumer thread and the task thread. */
- private final Handover handover;
+ final Handover handover;
/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
- private final KafkaConsumerThread consumerThread;
+ final KafkaConsumerThread consumerThread;
/** Flag to mark the main work loop as alive. */
- private volatile boolean running = true;
+ volatile boolean running = true;
// ------------------------------------------------------------------------
@@ -111,19 +114,16 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
+ this.kafkaCollector = new KafkaCollector();
}
// ------------------------------------------------------------------------
// Fetcher work methods
// ------------------------------------------------------------------------
- private final KafkaCollector kafkaCollector = new KafkaCollector();
-
@Override
public void runFetchLoop() throws Exception {
try {
- final Handover handover = this.handover;
-
// kick off the actual Kafka consumer
consumerThread.start();
@@ -138,23 +138,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
- for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
- deserializer.deserialize(record, kafkaCollector);
-
- // emit the actual records. this also updates offset state atomically and emits
- // watermarks
- emitRecordsWithTimestamps(
- kafkaCollector.getRecords(),
- partition,
- record.offset(),
- record.timestamp());
-
- if (kafkaCollector.isEndOfStreamSignalled()) {
- // end of stream signaled
- running = false;
- break;
- }
- }
+ partitionConsumerRecordsHandler(partitionRecords, partition);
}
}
}
@@ -189,6 +173,29 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
return "Kafka Fetcher";
}
+ protected void partitionConsumerRecordsHandler(
+ List<ConsumerRecord<byte[], byte[]>> partitionRecords,
+ KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
+
+ for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+ deserializer.deserialize(record, kafkaCollector);
+
+ // emit the actual records. this also updates offset state atomically and emits
+ // watermarks
+ emitRecordsWithTimestamps(
+ kafkaCollector.getRecords(),
+ partition,
+ record.offset(),
+ record.timestamp());
+
+ if (kafkaCollector.isEndOfStreamSignalled()) {
+ // end of stream signaled
+ running = false;
+ break;
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Implement Methods of the AbstractFetcher
// ------------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
new file mode 100644
index 0000000..5d380da
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends KafkaFetcher<T> {
+ /** The handler to check and generate watermarks from fetched records. **/
+ private final WatermarkHandler watermarkHandler;
+
+ /** The schema to convert between Kafka's byte messages, and Flink's objects. */
+ private final KafkaShuffleElementDeserializer kafkaShuffleDeserializer;
+
+ public KafkaShuffleFetcher(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ KafkaDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics,
+ TypeSerializer<T> typeSerializer,
+ int producerParallelism) throws Exception {
+ super(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarkStrategy,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ userCodeClassLoader,
+ taskNameWithSubtasks,
+ deserializer,
+ kafkaProperties,
+ pollTimeout,
+ subtaskMetricGroup,
+ consumerMetricGroup,
+ useMetrics);
+
+ this.kafkaShuffleDeserializer = new KafkaShuffleElementDeserializer<>(typeSerializer);
+ this.watermarkHandler = new WatermarkHandler(producerParallelism);
+ }
+
+ @Override
+ protected String getFetcherName() {
+ return "Kafka Shuffle Fetcher";
+ }
+
+ @Override
+ protected void partitionConsumerRecordsHandler(
+ List<ConsumerRecord<byte[], byte[]>> partitionRecords,
+ KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
+
+ for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+ final KafkaShuffleElement element = kafkaShuffleDeserializer.deserialize(record);
+
+ // TODO: Do we need to check the end of stream if reaching the end watermark
+ // TODO: Currently, if one of the partition sends an end-of-stream signal the fetcher stops running.
+ // The current "ending of stream" logic in KafkaFetcher a bit strange: if any partition has a record
+ // signaled as "END_OF_STREAM", the fetcher will stop running. Notice that the signal is coming from
+ // the deserializer, which means from Kafka data itself. But it is possible that other topics
+ // and partitions still have data to read. Finishing reading Partition0 can not guarantee that Partition1
+ // also finishes.
+ if (element.isRecord()) {
+ // timestamp is inherent from upstream
+ // If using ProcessTime, timestamp is going to be ignored (upstream does not include timestamp as well)
+ // If using IngestionTime, timestamp is going to be overwritten
+ // If using EventTime, timestamp is going to be used
+ synchronized (checkpointLock) {
+ KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+ sourceContext.collectWithTimestamp(
+ elementAsRecord.value,
+ elementAsRecord.timestamp == null ? record.timestamp() : elementAsRecord.timestamp);
+ partition.setOffset(record.offset());
+ }
+ } else if (element.isWatermark()) {
+ final KafkaShuffleWatermark watermark = element.asWatermark();
+ Optional<Watermark> newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+ newWatermark.ifPresent(sourceContext::emitWatermark);
+ }
+ }
+ }
+
+ /**
+ * An element in a KafkaShuffle. Can be a record or a Watermark.
+ */
+ @VisibleForTesting
+ public abstract static class KafkaShuffleElement {
+
+ public boolean isRecord() {
+ return getClass() == KafkaShuffleRecord.class;
+ }
+
+ public boolean isWatermark() {
+ return getClass() == KafkaShuffleWatermark.class;
+ }
+
+ public <T> KafkaShuffleRecord<T> asRecord() {
+ return (KafkaShuffleRecord<T>) this;
+ }
+
+ public KafkaShuffleWatermark asWatermark() {
+ return (KafkaShuffleWatermark) this;
+ }
+ }
+
+ /**
+ * A watermark element in a KafkaShuffle. It includes
+ * - subtask index where the watermark is coming from
+ * - watermark timestamp
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleWatermark extends KafkaShuffleElement {
+ final int subtask;
+ final long watermark;
+
+ KafkaShuffleWatermark(int subtask, long watermark) {
+ this.subtask = subtask;
+ this.watermark = watermark;
+ }
+
+ public int getSubtask() {
+ return subtask;
+ }
+
+ public long getWatermark() {
+ return watermark;
+ }
+ }
+
+ /**
+ * One value with Type T in a KafkaShuffle. This stores the value and an optional associated timestamp.
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleRecord<T> extends KafkaShuffleElement {
+ final T value;
+ final Long timestamp;
+
+ KafkaShuffleRecord(T value) {
+ this.value = value;
+ this.timestamp = null;
+ }
+
+ KafkaShuffleRecord(long timestamp, T value) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ /**
+ * Deserializer for KafkaShuffleElement.
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleElementDeserializer<T> implements Serializable {
+ private static final long serialVersionUID = 1000001L;
+
+ private final TypeSerializer<T> typeSerializer;
+
+ private transient DataInputDeserializer dis;
+
+ @VisibleForTesting
+ public KafkaShuffleElementDeserializer(TypeSerializer<T> typeSerializer) {
+ this.typeSerializer = typeSerializer;
+ }
+
+ @VisibleForTesting
+ public KafkaShuffleElement deserialize(ConsumerRecord<byte[], byte[]> record)
+ throws Exception {
+ byte[] value = record.value();
+
+ if (dis != null) {
+ dis.setBuffer(value);
+ } else {
+ dis = new DataInputDeserializer(value);
+ }
+
+ // version byte
+ ByteSerializer.INSTANCE.deserialize(dis);
+ int tag = ByteSerializer.INSTANCE.deserialize(dis);
+
+ if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+ return new KafkaShuffleRecord<>(typeSerializer.deserialize(dis));
+ } else if (tag == TAG_REC_WITH_TIMESTAMP) {
+ return new KafkaShuffleRecord<>(
+ LongSerializer.INSTANCE.deserialize(dis),
+ typeSerializer.deserialize(dis));
+ } else if (tag == TAG_WATERMARK) {
+ return new KafkaShuffleWatermark(
+ IntSerializer.INSTANCE.deserialize(dis), LongSerializer.INSTANCE.deserialize(dis));
+ }
+
+ throw new UnsupportedOperationException("Unsupported tag format");
+ }
+ }
+
+ /**
+ * WatermarkHandler to check and generate watermarks from fetched records.
+ */
+ private static class WatermarkHandler {
+ private final int producerParallelism;
+ private final Map<Integer, Long> subtaskWatermark;
+
+ private long currentMinWatermark = Long.MIN_VALUE;
+
+ WatermarkHandler(int producerParallelism) {
+ this.producerParallelism = producerParallelism;
+ this.subtaskWatermark = new HashMap<>(producerParallelism);
+ }
+
+ private Optional<Watermark> checkAndGetNewWatermark(KafkaShuffleWatermark newWatermark) {
+ // watermarks is incremental for the same partition and PRODUCER subtask
+ Long currentSubTaskWatermark = subtaskWatermark.get(newWatermark.subtask);
+
+ // watermark is strictly increasing
+ Preconditions.checkState(
+ (currentSubTaskWatermark == null) || (currentSubTaskWatermark < newWatermark.watermark),
+ "Watermark should always increase: current : new " + currentSubTaskWatermark + ":" + newWatermark.watermark);
+
+ subtaskWatermark.put(newWatermark.subtask, newWatermark.watermark);
+
+ if (subtaskWatermark.values().size() < producerParallelism) {
+ return Optional.empty();
+ }
+
+ long minWatermark = subtaskWatermark.values().stream().min(Comparator.naturalOrder()).orElse(Long.MIN_VALUE);
+ if (currentMinWatermark < minWatermark) {
+ currentMinWatermark = minWatermark;
+ return Optional.of(new Watermark(minWatermark));
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
new file mode 100644
index 0000000..6403f42
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer<T> extends FlinkKafkaConsumer<T> {
+ private final TypeSerializer<T> typeSerializer;
+ private final int producerParallelism;
+
+ FlinkKafkaShuffleConsumer(
+ String topic,
+ TypeInformationSerializationSchema<T> schema,
+ TypeSerializer<T> typeSerializer,
+ Properties props) {
+ // The schema is needed to call the right FlinkKafkaConsumer constructor.
+ // It is never used, can be `null`, but `null` confuses the compiler.
+ super(topic, schema, props);
+ this.typeSerializer = typeSerializer;
+
+ Preconditions.checkArgument(
+ props.getProperty(PRODUCER_PARALLELISM) != null,
+ "Missing producer parallelism for Kafka Shuffle");
+ producerParallelism = PropertiesUtil.getInt(props, PRODUCER_PARALLELISM, Integer.MAX_VALUE);
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics) throws Exception {
+ // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+ // this overwrites whatever setting the user configured in the properties
+ adjustAutoCommitConfig(properties, offsetCommitMode);
+
+ return new KafkaShuffleFetcher<>(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarkStrategy,
+ runtimeContext.getProcessingTimeService(),
+ runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+ runtimeContext.getUserCodeClassLoader(),
+ runtimeContext.getTaskNameWithSubtasks(),
+ deserializer,
+ properties,
+ pollTimeout,
+ runtimeContext.getMetricGroup(),
+ consumerMetricGroup,
+ useMetrics,
+ typeSerializer,
+ producerParallelism);
+ }
+}