You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/18 12:58:45 UTC
[flink] 04/05: [FLINK-15670][connector] Kafka Shuffle API Part
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e2305f234d6f88c503aaef1576d6b06910bbf1ea
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 18 17:05:12 2020 +0800
[FLINK-15670][connector] Kafka Shuffle API Part
KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
---
.../kafka/shuffle/FlinkKafkaShuffle.java | 391 +++++++++++++++++++++
.../kafka/shuffle/StreamKafkaShuffleSink.java | 43 +++
2 files changed, 434 insertions(+)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
new file mode 100644
index 0000000..6408360
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ... // create execution environment
+ * DataStream<X> source = env.addSource(...) // add data stream source
+ * DataStream<Y> dataStream = ... // some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy( // keyBy shuffle through kafka
+ * dataStream, // data stream to be shuffled
+ * topic, // Kafka topic written to
+ * producerParallelism, // the number of tasks of a Kafka Producer
+ * numberOfPartitions, // the number of partitions of the Kafka topic written to
+ * kafkaProperties, // kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>); // key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform... // some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy( // Read the Kafka shuffle data again for other usages
+ * topic, // the topic of Kafka where data is persisted
+ * env, // execution environment, and it can be a new environment
+ * typeInformation<Y>, // type information of the data persisted in Kafka
+ * kafkaProperties, // kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>); // key selector to retrieve key
+ *
+ * keyedStreamReuse.transform... // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. {@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In the example, the job execution graph is
+ * decoupled to three regions: `KafkaShuffleProducer', `KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ * through `PERSISTENT DATA` as shown below. If any region fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ * source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> KafkaShuffleConsumer -> ...
+ * |
+ * | ----------> KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * <p>Persisting keyBy shuffle is achieved by wrapping a {@link FlinkKafkaShuffleProducer} and
+ * {@link FlinkKafkaShuffleConsumer} together.
+ *
+ * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+ * is similar to {@link DataStream#keyBy(KeySelector)}. They use the same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka partition based on where the key goes.
+ * Here, `numberOfPartitions` equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts its watermark
+ * to ALL of the Kafka partitions to make sure watermark information is propagated correctly.
+ *
+ * <p>On the consumer side, each consumer task should read partitions equal to the key group indices
+ * it is assigned. `numberOfPartitions` is the maximum parallelism of the consumer. This version only
+ * supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka partitions. Notice that a consumer task only starts
+ * to emit a watermark after reading at least one watermark from each producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know `producerParallelism` as well.
+ *
+ * @see FlinkKafkaShuffle#writeKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param keySelector Key selector to retrieve key from `dataStream'
+ * @param <T> Type of the input data stream
+ * @param <K> Type of key
+ */
+ public static <T, K> KeyedStream<T, K> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ KeySelector<T, K> keySelector) {
+ // KafkaProducer#propsToMap uses Properties purely as a HashMap without considering the default properties
+ // So we have to flatten the default property to first level elements.
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM, String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER, String.valueOf(numberOfPartitions));
+
+ StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+
+ writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+ return readKeyBy(topic, env, dataStream.getType(), kafkaProperties, keySelector);
+ }
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * <p>Persisting keyBy shuffle is achieved by wrapping a {@link FlinkKafkaShuffleProducer} and
+ * {@link FlinkKafkaShuffleConsumer} together.
+ *
+ * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+ * is similar to {@link DataStream#keyBy(KeySelector)}. They use the same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka partition based on where the key goes.
+ * Here, `numberOfPartitions` equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts its watermark
+ * to ALL of the Kafka partitions to make sure watermark information is propagated correctly.
+ *
+ * <p>On the consumer side, each consumer task should read partitions equal to the key group indices
+ * it is assigned. `numberOfPartitions` is the maximum parallelism of the consumer. This version only
+ * supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka partitions. Notice that a consumer task only starts
+ * to emit a watermark after reading at least one watermark from each producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know `producerParallelism` as well.
+ *
+ * @see FlinkKafkaShuffle#writeKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param fields Key positions from the input data stream
+ * @param <T> Type of the input data stream
+ */
+ public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ int... fields) {
+ return persistentKeyBy(
+ dataStream,
+ topic,
+ producerParallelism,
+ numberOfPartitions,
+ properties,
+ keySelector(dataStream, fields));
+ }
+
+ /**
+ * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+ *
+ * <p>This function contains a {@link FlinkKafkaShuffleProducer} to shuffle and persist data in Kafka.
+ * {@link FlinkKafkaShuffleProducer} uses the same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka partition based on the key.
+ * Here, the number of partitions equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts each watermark
+ * to all of the Kafka partitions to make sure watermark information is propagated properly.
+ *
+ * <p>Attention: make sure kafkaProperties include
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of the producer.
+ * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of partitions.
+ * They are not necessarily the same and allowed to be set independently.
+ *
+ * @see FlinkKafkaShuffle#persistentKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param kafkaProperties Kafka properties for Kafka Producer
+ * @param keySelector Key selector to retrieve key from `dataStream'
+ * @param <T> Type of the input data stream
+ * @param <K> Type of key
+ */
+ public static <T, K> void writeKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ Properties kafkaProperties,
+ KeySelector<T, K> keySelector) {
+
+ StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+ TypeSerializer<T> typeSerializer = dataStream.getType().createSerializer(env.getConfig());
+
+ // write data to Kafka
+ FlinkKafkaShuffleProducer<T, K> kafkaProducer = new FlinkKafkaShuffleProducer<>(
+ topic,
+ typeSerializer,
+ kafkaProperties,
+ env.clean(keySelector),
+ FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+ // make sure the sink parallelism is set to producerParallelism
+ Preconditions.checkArgument(
+ kafkaProperties.getProperty(PRODUCER_PARALLELISM) != null,
+ "Missing producer parallelism for Kafka Shuffle");
+ int producerParallelism = PropertiesUtil.getInt(kafkaProperties, PRODUCER_PARALLELISM, Integer.MIN_VALUE);
+
+ addKafkaShuffle(dataStream, kafkaProducer, producerParallelism);
+ }
+
+ /**
+ * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+ *
+ * <p>This function contains a {@link FlinkKafkaShuffleProducer} to shuffle and persist data in Kafka.
+ * {@link FlinkKafkaShuffleProducer} uses the same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes.
+ *
+ * <p>Hence, each producer task can potentially write to each Kafka partition based on the key.
+ * Here, the number of partitions equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts each watermark
+ * to all of the Kafka partitions to make sure watermark information is propagated properly.
+ *
+ * <p>Attention: make sure kafkaProperties include
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of the producer.
+ * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of partitions.
+ * They are not necessarily the same and allowed to be set independently.
+ *
+ * @see FlinkKafkaShuffle#persistentKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param kafkaProperties Kafka properties for Kafka Producer
+ * @param fields Key positions from the input data stream
+ * @param <T> Type of the input data stream
+ */
+ public static <T> void writeKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ Properties kafkaProperties,
+ int... fields) {
+ writeKeyBy(dataStream, topic, kafkaProperties, keySelector(dataStream, fields));
+ }
+
+ /**
+ * The read side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+ *
+ * <p>Each consumer task should read kafka partitions equal to the key group indices it is assigned.
+ * The number of kafka partitions is the maximum parallelism of the consumer.
+ * This version only supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka partitions. Notice that a consumer task only starts
+ * to emit a watermark after receiving at least one watermark from each producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know `producerParallelism` as well.
+ *
+ * <p>Attention: make sure kafkaProperties include
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of the producer.
+ * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of partitions.
+ * They are not necessarily the same and allowed to be set independently.
+ *
+ * @see FlinkKafkaShuffle#persistentKeyBy
+ * @see FlinkKafkaShuffle#writeKeyBy
+ *
+ * @param topic The topic of Kafka where data is persisted
+ * @param env Execution environment. readKeyBy's environment can be different from writeKeyBy's
+ * @param typeInformation Type information of the data persisted in Kafka
+ * @param kafkaProperties kafka properties for Kafka Consumer
+ * @param keySelector key selector to retrieve key
+ * @param <T> Schema type
+ * @param <K> Key type
+ * @return Keyed data stream
+ */
+ public static <T, K> KeyedStream<T, K> readKeyBy(
+ String topic,
+ StreamExecutionEnvironment env,
+ TypeInformation<T> typeInformation,
+ Properties kafkaProperties,
+ KeySelector<T, K> keySelector) {
+
+ TypeSerializer<T> typeSerializer = typeInformation.createSerializer(env.getConfig());
+ TypeInformationSerializationSchema<T> schema =
+ new TypeInformationSerializationSchema<>(typeInformation, typeSerializer);
+
+ SourceFunction<T> kafkaConsumer =
+ new FlinkKafkaShuffleConsumer<>(topic, schema, typeSerializer, kafkaProperties);
+
+ // TODO: consider situations where numberOfPartitions != consumerParallelism
+ Preconditions.checkArgument(
+ kafkaProperties.getProperty(PARTITION_NUMBER) != null,
+ "Missing partition number for Kafka Shuffle");
+ int numberOfPartitions = PropertiesUtil.getInt(kafkaProperties, PARTITION_NUMBER, Integer.MIN_VALUE);
+ DataStream<T> outputDataStream = env.addSource(kafkaConsumer).setParallelism(numberOfPartitions);
+
+ return DataStreamUtils.reinterpretAsKeyedStream(outputDataStream, keySelector);
+ }
+
+ /**
+ * Adds a {@link StreamKafkaShuffleSink} to {@link DataStream}.
+ *
+ * <p>{@link StreamKafkaShuffleSink} is associated a {@link FlinkKafkaShuffleProducer}.
+ *
+ * @param inputStream Input data stream connected to the shuffle
+ * @param kafkaShuffleProducer Kafka shuffle sink function that can handle both records and watermark
+ * @param producerParallelism The number of tasks writing to the kafka shuffle
+ */
+ private static <T, K> void addKafkaShuffle(
+ DataStream<T> inputStream,
+ FlinkKafkaShuffleProducer<T, K> kafkaShuffleProducer,
+ int producerParallelism) {
+
+ // read the output type of the input Transform to coax out errors about MissingTypeInfo
+ inputStream.getTransformation().getOutputType();
+
+ StreamKafkaShuffleSink<T> shuffleSinkOperator = new StreamKafkaShuffleSink<>(kafkaShuffleProducer);
+ SinkTransformation<T> transformation = new SinkTransformation<>(
+ inputStream.getTransformation(),
+ "kafka_shuffle",
+ shuffleSinkOperator,
+ inputStream.getExecutionEnvironment().getParallelism());
+ inputStream.getExecutionEnvironment().addOperator(transformation);
+ transformation.setParallelism(producerParallelism);
+ }
+
+ // A better place to put this function is DataStream; but put it here for now to avoid changing DataStream
+ private static <T> KeySelector<T, Tuple> keySelector(DataStream<T> source, int... fields) {
+ KeySelector<T, Tuple> keySelector;
+ if (source.getType() instanceof BasicArrayTypeInfo || source.getType() instanceof PrimitiveArrayTypeInfo) {
+ keySelector = KeySelectorUtil.getSelectorForArray(fields, source.getType());
+ } else {
+ Keys<T> keys = new Keys.ExpressionKeys<>(fields, source.getType());
+ keySelector = KeySelectorUtil.getSelectorForKeys(
+ keys,
+ source.getType(),
+ source.getExecutionEnvironment().getConfig());
+ }
+
+ return keySelector;
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
new file mode 100644
index 0000000..ddb3e07
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A customized {@link StreamOperator} for executing {@link FlinkKafkaShuffleProducer} that handle
+ * both elements and watermarks. If the shuffle sink is determined to be useful to other sinks in the future,
+ * we should abstract this operator to data stream api. For now, we keep the operator this way to avoid
+ * public interface change.
+ */
+@Internal
+class StreamKafkaShuffleSink<IN> extends StreamSink<IN> {
+
+ public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer flinkKafkaShuffleProducer) {
+ super(flinkKafkaShuffleProducer);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ ((FlinkKafkaShuffleProducer) userFunction).invoke(mark);
+ }
+}