You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/10 08:24:14 UTC
[inlong] branch master updated: [INLONG-5377][Sort] Add reporting metrics for kafka connector sink and audit SDK (#5429)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 458f4604e [INLONG-5377][Sort] Add reporting metrics for kafka connector sink and audit SDK (#5429)
458f4604e is described below
commit 458f4604efbfc13106d0efa9bf64e91cd7f63527
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Aug 10 16:24:10 2022 +0800
[INLONG-5377][Sort] Add reporting metrics for kafka connector sink and audit SDK (#5429)
---
.../sort/protocol/node/load/KafkaLoadNode.java | 2 +-
.../inlong/sort/kafka/FlinkKafkaProducer.java | 2053 ++++++++++++++++++++
.../apache/inlong/sort/kafka/KafkaDynamicSink.java | 47 +-
.../sort/kafka/table/KafkaDynamicTableFactory.java | 16 +-
.../table/UpsertKafkaDynamicTableFactory.java | 404 ++++
.../org.apache.flink.table.factories.Factory | 3 +-
6 files changed, 2507 insertions(+), 18 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 18132f4e9..010bc7efb 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -116,7 +116,7 @@ public class KafkaLoadNode extends LoadNode implements Metadata, Serializable {
options.put("sink.ignore.changelog", "true");
options.putAll(format.generateOptions(false));
} else {
- options.put("connector", "upsert-kafka");
+ options.put("connector", "upsert-kafka-inlong");
options.putAll(format.generateOptions(true));
}
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
new file mode 100644
index 000000000..2df2b0893
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,2053 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaErrorCode;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.TransactionalIdsGenerator;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Flink Sink to produce data into a Kafka topic. By default producer will use {@link
+ * FlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
+ * FlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
+ *
+ * Add an option `inlong.metric` to support metrics.
+ */
+@PublicEvolving
+public class FlinkKafkaProducer<IN>
+ extends TwoPhaseCommitSinkFunction<
+ IN,
+ FlinkKafkaProducer.KafkaTransactionState,
+ FlinkKafkaProducer.KafkaTransactionContext> {
+
+ /**
+ * This coefficient determines what is the safe scale down factor.
+ *
+ * <p>If the Flink application previously failed before first checkpoint completed or we are
+ * starting new batch of {@link FlinkKafkaProducer} from scratch without clean shutdown of the
+ * previous one, {@link FlinkKafkaProducer} doesn't know what was the set of previously used
+ * Kafka's transactionalId's. In that case, it will try to play safe and abort all of the
+ * possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() *
+ * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) }
+ *
+ * <p>The range of available to use transactional ids is: {@code [0,
+ * getNumberOfParallelSubtasks() * kafkaProducersPoolSize) }
+ *
+ * <p>This means that if we decrease {@code getNumberOfParallelSubtasks()} by a factor larger
+ * than {@code SAFE_SCALE_DOWN_FACTOR} we can have a left some lingering transaction.
+ */
+ public static final int SAFE_SCALE_DOWN_FACTOR = 5;
+ /**
+ * Default number of KafkaProducers in the pool. See {@link
+ * FlinkKafkaProducer.Semantic#EXACTLY_ONCE}.
+ */
+ public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
+ /**
+ * Default value for kafka transaction timeout.
+ */
+ public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1);
+ /**
+ * Configuration key for disabling the metrics reporting.
+ */
+ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+ private static final long serialVersionUID = 1L;
+ /**
+ * Number of characters to truncate the taskName to for the Kafka transactionalId. The maximum
+ * this can possibly be set to is 32,767 - (length of operatorUniqueId).
+ */
+ private static final short maxTaskNameSize = 1_000;
+ /**
+ * Descriptor of the transactional IDs list. Note: This state is serialized by Kryo Serializer
+ * and it has compatibility problem that will be removed later. Please use
+ * NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.
+ */
+ @Deprecated
+ private static final ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
+ NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR =
+ new ListStateDescriptor<>(
+ "next-transactional-id-hint",
+ TypeInformation.of(NextTransactionalIdHint.class));
+ private static final ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
+ NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 =
+ new ListStateDescriptor<>(
+ "next-transactional-id-hint-v2",
+ new NextTransactionalIdHintSerializer());
+ /**
+ * User defined properties for the Producer.
+ */
+ protected final Properties producerConfig;
+ /**
+ * The name of the default topic this producer is writing data to.
+ */
+ protected final String defaultTopicId;
+ /**
+ * Partitions of each topic.
+ */
+ protected final Map<String, int[]> topicPartitionsMap;
+ /**
+ * Number of unacknowledged records.
+ */
+ protected final AtomicLong pendingRecords = new AtomicLong();
+ /**
+ * (Serializable) SerializationSchema for turning objects used with Flink into. byte[] for
+ * Kafka.
+ */
+ @Nullable
+ private final KeyedSerializationSchema<IN> keyedSchema;
+ /**
+ * (Serializable) serialization schema for serializing records to {@link ProducerRecord
+ * ProducerRecords}.
+ */
+ @Nullable
+ private final KafkaSerializationSchema<IN> kafkaSchema;
+ /**
+ * User-provided partitioner for assigning an object to a Kafka partition for each topic.
+ */
+ @Nullable
+ private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
+ /**
+ * Max number of producers in the pool. If all producers are in use, snapshoting state will
+ * throw an exception.
+ */
+ private final int kafkaProducersPoolSize;
+ /**
+ * Pool of available transactional ids.
+ */
+ private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
+ /**
+ * Cache of metrics to replace already registered metrics instead of overwriting existing ones.
+ */
+ private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
+ /**
+ * Metric for InLong
+ */
+ private final String inLongMetric;
+ /**
+ * audit host and ports
+ */
+ private final String auditHostAndPorts;
+ /**
+ * audit implement
+ */
+ private transient AuditImp auditImp;
+ /**
+ * inLong groupId
+ */
+ private String inLongGroupId;
+ /**
+ * inLong streamId
+ */
+ private String inLongStreamId;
+
+ /**
+ * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
+ */
+ protected boolean writeTimestampToKafka = false;
+ /**
+ * Semantic chosen for this instance.
+ */
+ protected FlinkKafkaProducer.Semantic semantic;
+ /**
+ * The callback than handles error propagation or logging callbacks.
+ */
+ @Nullable
+ protected transient Callback callback;
+ /**
+ * Errors encountered in the async producer are stored here.
+ */
+ @Nullable
+ protected transient volatile Exception asyncException;
+ /**
+ * sink metric data
+ */
+ private SinkMetricData metricData;
+ private Long dataSize = 0L;
+ private Long rowSize = 0L;
+ /**
+ * State for nextTransactionalIdHint.
+ */
+ private transient ListState<FlinkKafkaProducer.NextTransactionalIdHint>
+ nextTransactionalIdHintState;
+
+ // -------------------------------- Runtime fields ------------------------------------------
+ /**
+ * Generator for Transactional IDs.
+ */
+ private transient TransactionalIdsGenerator transactionalIdsGenerator;
+ /**
+ * Hint for picking next transactional id.
+ */
+ private transient FlinkKafkaProducer.NextTransactionalIdHint nextTransactionalIdHint;
+ /**
+ * Flag indicating whether to accept failures (and log them), or to fail on failures.
+ */
+ private boolean logFailuresOnly;
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * @param brokerList Comma separated addresses of the brokers
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization schema.
+ */
+ public FlinkKafkaProducer(
+ String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer(String,
+ * SerializationSchema, Properties, Optional)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined key-less serialization schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ Optional.of(new FlinkFixedPartitioner<>()));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a key-less {@link SerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not
+ * have an attached key. Therefore, if a partitioner is also not provided, records will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A key-less serializable serialization schema for turning user
+ * objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be distributed to Kafka
+ * partitions in a round-robin fashion.
+ */
+ public FlinkKafkaProducer(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner.orElse(null),
+ Semantic.AT_LEAST_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a key-less {@link SerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not
+ * have an attached key. Therefore, if a partitioner is also not provided, records will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A key-less serializable serialization schema for turning user
+ * objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be distributed to Kafka
+ * partitions in a round-robin fashion.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ @Nullable FlinkKafkaPartitioner<IN> customPartitioner,
+ FlinkKafkaProducer.Semantic semantic,
+ int kafkaProducersPoolSize) {
+ this(
+ topicId,
+ null,
+ null,
+ new KafkaSerializationSchemaWrapper<>(
+ topicId, customPartitioner, false, serializationSchema),
+ producerConfig,
+ semantic,
+ kafkaProducersPoolSize,
+ null,
+ null);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer(String,
+ * KeyedSerializationSchema, Properties, Optional)} instead.
+ *
+ * @param brokerList Comma separated addresses of the brokers
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer(
+ String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+ this(
+ topicId,
+ serializationSchema,
+ getPropertiesFromBrokerList(brokerList),
+ Optional.of(new FlinkFixedPartitioner<IN>()));
+ }
+
+ // ------------------- Key/Value serialization schema constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer(String,
+ * KeyedSerializationSchema, Properties, Optional)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer(
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ Optional.of(new FlinkFixedPartitioner<IN>()));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer.Semantic}).
+ * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer(
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer.Semantic semantic) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ Optional.of(new FlinkFixedPartitioner<IN>()),
+ semantic,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be partitioned by the attached key
+ * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If
+ * written records do not have a key (i.e., {@link
+ * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be partitioned by the key of
+ * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
+ * keys are {@code null}, then records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+ this(
+ defaultTopicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be partitioned by the attached key
+ * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If
+ * written records do not have a key (i.e., {@link
+ * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be partitioned by the key of
+ * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
+ * keys are {@code null}, then records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+ * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+ FlinkKafkaProducer.Semantic semantic,
+ int kafkaProducersPoolSize) {
+ this(
+ defaultTopicId,
+ serializationSchema,
+ customPartitioner.orElse(null),
+ null, /* kafka serialization schema */
+ producerConfig,
+ semantic,
+ kafkaProducersPoolSize,
+ null,
+ null);
+ }
+
+ /**
+ * Creates a {@link FlinkKafkaProducer} for a given topic. The sink produces its input to the
+ * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link
+ * ProducerRecord}, including partitioning information.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer.Semantic}).
+ */
+ public FlinkKafkaProducer(
+ String defaultTopic,
+ KafkaSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer.Semantic semantic) {
+ this(
+ defaultTopic,
+ serializationSchema,
+ producerConfig,
+ semantic,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE,
+ null,
+ null);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a {@link KafkaSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer(
+ String defaultTopic,
+ KafkaSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer.Semantic semantic,
+ int kafkaProducersPoolSize,
+ String inLongMetric,
+ String auditHostAndPorts) {
+ this(
+ defaultTopic,
+ null,
+ null, /* keyed schema and FlinkKafkaPartitioner */
+ serializationSchema,
+ producerConfig,
+ semantic,
+ kafkaProducersPoolSize,
+ inLongMetric,
+ auditHostAndPorts);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a {@link KafkaSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be partitioned by the attached key
+ * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If
+ * written records do not have a key (i.e., {@link
+ * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param keyedSchema A serializable serialization schema for turning user objects into a
+ * kafka-consumable byte[] supporting key/value messages
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be partitioned by the key of
+ * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
+ * keys are {@code null}, then records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @param kafkaSchema A serializable serialization schema for turning user objects into a
+ * kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+ */
+ private FlinkKafkaProducer(
+ String defaultTopic,
+ KeyedSerializationSchema<IN> keyedSchema,
+ FlinkKafkaPartitioner<IN> customPartitioner,
+ KafkaSerializationSchema<IN> kafkaSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer.Semantic semantic,
+ int kafkaProducersPoolSize,
+ String inLongMetric,
+ String auditHostAndPorts) {
+ super(
+ new FlinkKafkaProducer.TransactionStateSerializer(),
+ new FlinkKafkaProducer.ContextStateSerializer());
+
+ this.inLongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+
+ this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
+
+ if (kafkaSchema != null) {
+ this.keyedSchema = null;
+ this.kafkaSchema = kafkaSchema;
+ this.flinkKafkaPartitioner = null;
+ ClosureCleaner.clean(
+ this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+
+ if (customPartitioner != null) {
+ throw new IllegalArgumentException(
+ "Customer partitioner can only be used when"
+ + "using a KeyedSerializationSchema or SerializationSchema.");
+ }
+ } else if (keyedSchema != null) {
+ this.kafkaSchema = null;
+ this.keyedSchema = keyedSchema;
+ this.flinkKafkaPartitioner = customPartitioner;
+ ClosureCleaner.clean(
+ this.flinkKafkaPartitioner,
+ ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+ true);
+ ClosureCleaner.clean(
+ this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ } else {
+ throw new IllegalArgumentException(
+ "You must provide either a KafkaSerializationSchema or a"
+ + "KeyedSerializationSchema.");
+ }
+
+ this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
+ this.semantic = checkNotNull(semantic, "semantic is null");
+ this.kafkaProducersPoolSize = kafkaProducersPoolSize;
+ checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
+
+ // set the producer configuration properties for kafka record key value serializers.
+ if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ ByteArraySerializer.class.getName());
+ } else {
+ LOG.warn(
+ "Overwriting the '{}' is not recommended",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ ByteArraySerializer.class.getName());
+ } else {
+ LOG.warn(
+ "Overwriting the '{}' is not recommended",
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+ // eagerly ensure that bootstrap servers are set.
+ if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ throw new IllegalArgumentException(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ + " must be supplied in the producer config properties.");
+ }
+
+ if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+ long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
+ checkState(
+ timeout < Integer.MAX_VALUE && timeout > 0,
+ "timeout does not fit into 32 bit integer");
+ this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
+ LOG.warn(
+ "Property [{}] not specified. Setting it to {}",
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+ DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
+ }
+
+ // Enable transactionTimeoutWarnings to avoid silent data loss
+ // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
+ // The KafkaProducer may not throw an exception if the transaction failed to commit
+ if (semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
+ final Object object =
+ this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+ final long transactionTimeout;
+ if (object instanceof String && StringUtils.isNumeric((String) object)) {
+ transactionTimeout = Long.parseLong((String) object);
+ } else if (object instanceof Number) {
+ transactionTimeout = ((Number) object).longValue();
+ } else {
+ throw new IllegalArgumentException(
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+ + " must be numeric, was "
+ + object);
+ }
+ super.setTransactionTimeout(transactionTimeout);
+ super.enableTransactionTimeoutWarnings(0.8);
+ }
+
+ this.topicPartitionsMap = new HashMap<>();
+ }
+
+ private static void initTransactionalProducerConfig(
+ Properties producerConfig, String transactionalId) {
+ producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ }
+
+ // ---------------------------------- Properties --------------------------
+
+ private static Properties getPropertiesFromBrokerList(String brokerList) {
+ String[] elements = brokerList.split(",");
+
+ // validate the broker addresses
+ for (String broker : elements) {
+ NetUtils.getCorrectHostnamePort(broker);
+ }
+
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return props;
+ }
+
+ protected static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same
+ // across subtasks
+ Collections.sort(
+ partitionsList,
+ new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ int[] partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
+ return partitions;
+ }
+
+ /**
+ * If set to true, Flink will write the (event time) timestamp attached to each record into
+ * Kafka. Timestamps must be positive for Kafka to accept them.
+ *
+ * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to
+ * Kafka.
+ */
+ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+ this.writeTimestampToKafka = writeTimestampToKafka;
+ if (kafkaSchema instanceof KafkaSerializationSchemaWrapper) {
+ ((KafkaSerializationSchemaWrapper<IN>) kafkaSchema)
+ .setWriteTimestamp(writeTimestampToKafka);
+ }
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them. If this is set to true,
+ * then exceptions will be only logged, if set to false, exceptions will be eventually thrown
+ * and cause the streaming program to fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ /**
+ * Disables the propagation of exceptions thrown when committing presumably timed out Kafka
+ * transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
+ * never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions
+ * will still be logged to inform the user that data loss might have occurred.
+ *
+ * <p>Note that we use {@link System#currentTimeMillis()} to track the age of a transaction.
+ * Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will
+ * attempt at least one commit of the transaction before giving up.
+ */
+ @Override
+ public FlinkKafkaProducer<IN> ignoreFailuresAfterTransactionTimeout() {
+ super.ignoreFailuresAfterTransactionTimeout();
+ return this;
+ }
+
+ /**
+ * Initializes the connection to Kafka.
+ */
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ if (logFailuresOnly) {
+ callback =
+ new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null) {
+ sendDirtyMetrics(rowSize, dataSize);
+ LOG.error(
+ "Error while sending record to Kafka: " + e.getMessage(),
+ e);
+ }
+ acknowledgeMessage();
+ }
+ };
+ } else {
+ callback =
+ new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null && asyncException == null) {
+ asyncException = exception;
+ sendDirtyMetrics(rowSize, dataSize);
+ }
+ acknowledgeMessage();
+ }
+ };
+ }
+
+ RuntimeContext ctx = getRuntimeContext();
+
+ if (flinkKafkaPartitioner != null) {
+ flinkKafkaPartitioner.open(
+ ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+ }
+
+ if (kafkaSchema instanceof KafkaContextAware) {
+ KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
+ contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
+ contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
+ }
+
+ if (kafkaSchema != null) {
+ kafkaSchema.open(
+ RuntimeContextInitializationContextAdapters.serializationAdapter(
+ getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
+ }
+
+ metricData = new SinkMetricData(ctx.getMetricGroup());
+ if (inLongMetric != null && !inLongMetric.isEmpty()) {
+ String[] inLongMetricArray = inLongMetric.split(DELIMITER);
+ inLongGroupId = inLongMetricArray[0];
+ inLongStreamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ metricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId, nodeId, DIRTY_BYTES,
+ new ThreadSafeCounter());
+ metricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId, nodeId, DIRTY_RECORDS,
+ new ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId, nodeId, NUM_BYTES_OUT,
+ new ThreadSafeCounter());
+ metricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId, nodeId, NUM_RECORDS_OUT,
+ new ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
+ NUM_BYTES_OUT_PER_SECOND);
+ metricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
+ NUM_RECORDS_OUT_PER_SECOND);
+ }
+
+ if (auditHostAndPorts != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+
+ super.open(configuration);
+ }
+
+ private void sendOutMetrics(Long rowSize, Long dataSize) {
+ if (metricData.getNumRecordsOut() != null) {
+ metricData.getNumRecordsOut().inc(rowSize);
+ }
+ if (metricData.getNumBytesOut() != null) {
+ metricData.getNumBytesOut().inc(dataSize);
+ }
+ }
+
+ private void sendDirtyMetrics(Long rowSize, Long dataSize) {
+ if (metricData.getDirtyRecords() != null) {
+ metricData.getDirtyRecords().inc(rowSize);
+ }
+ if (metricData.getDirtyBytes() != null) {
+ metricData.getDirtyBytes().inc(dataSize);
+ }
+ }
+
+ private void outputMetricForAudit(ProducerRecord<byte[], byte[]> record) {
+ if (auditImp != null) {
+ auditImp.add(
+ Constants.AUDIT_SORT_OUTPUT,
+ inLongGroupId,
+ inLongStreamId,
+ System.currentTimeMillis(),
+ 1,
+ record.value().length);
+ }
+ }
+
+ private void resetMetricSize() {
+ dataSize = 0L;
+ rowSize = 0L;
+ }
+
+ // ------------------- Logic for handling checkpoint flushing -------------------------- //
+
+ @Override
+ public void invoke(
+ FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
+ throws FlinkKafkaException {
+ checkErroneous();
+ resetMetricSize();
+
+ ProducerRecord<byte[], byte[]> record;
+ if (keyedSchema != null) {
+ byte[] serializedKey = keyedSchema.serializeKey(next);
+ byte[] serializedValue = keyedSchema.serializeValue(next);
+ String targetTopic = keyedSchema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = defaultTopicId;
+ }
+
+ Long timestamp = null;
+ if (this.writeTimestampToKafka) {
+ timestamp = context.timestamp();
+ }
+
+ int[] partitions = topicPartitionsMap.get(targetTopic);
+ if (null == partitions) {
+ partitions = getPartitionsByTopic(targetTopic, transaction.producer);
+ topicPartitionsMap.put(targetTopic, partitions);
+ }
+ if (flinkKafkaPartitioner != null) {
+ record =
+ new ProducerRecord<>(
+ targetTopic,
+ flinkKafkaPartitioner.partition(
+ next,
+ serializedKey,
+ serializedValue,
+ targetTopic,
+ partitions),
+ timestamp,
+ serializedKey,
+ serializedValue);
+ } else {
+ record =
+ new ProducerRecord<>(
+ targetTopic, null, timestamp, serializedKey, serializedValue);
+ }
+ } else if (kafkaSchema != null) {
+ if (kafkaSchema instanceof KafkaContextAware) {
+ @SuppressWarnings("unchecked")
+ KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
+
+ String targetTopic = contextAwareSchema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = defaultTopicId;
+ }
+ int[] partitions = topicPartitionsMap.get(targetTopic);
+
+ if (null == partitions) {
+ partitions = getPartitionsByTopic(targetTopic, transaction.producer);
+ topicPartitionsMap.put(targetTopic, partitions);
+ }
+
+ contextAwareSchema.setPartitions(partitions);
+ }
+ record = kafkaSchema.serialize(next, context.timestamp());
+ } else {
+ throw new RuntimeException(
+ "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"
+ + "is a bug.");
+ }
+
+ rowSize++;
+ dataSize = dataSize + record.value().length;
+ sendOutMetrics(rowSize, dataSize);
+ outputMetricForAudit(record);
+
+ pendingRecords.incrementAndGet();
+ transaction.producer.send(record, callback);
+ }
+
+ @Override
+ public void close() throws FlinkKafkaException {
+ // First close the producer for current transaction.
+ try {
+ final KafkaTransactionState currentTransaction = currentTransaction();
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions with some pending records
+ flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of
+ // producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ currentTransaction.producer.flush();
+ currentTransaction.producer.close(Duration.ofSeconds(0));
+ break;
+ }
+ }
+ super.close();
+ } catch (Exception e) {
+ asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
+ } finally {
+ // We may have to close producer of the current transaction in case some exception was
+ // thrown before
+ // the normal close routine finishes.
+ if (currentTransaction() != null) {
+ try {
+ currentTransaction().producer.close(Duration.ofSeconds(0));
+ } catch (Throwable t) {
+ LOG.warn("Error closing producer.", t);
+ }
+ }
+ // Make sure all the producers for pending transactions are closed.
+ pendingTransactions()
+ .forEach(
+ transaction -> {
+ try {
+ transaction.getValue().producer.close(Duration.ofSeconds(0));
+ } catch (Throwable t) {
+ LOG.warn("Error closing producer.", t);
+ }
+ });
+ // make sure we propagate pending errors
+ checkErroneous();
+ }
+ }
+
+ @Override
+ protected FlinkKafkaProducer.KafkaTransactionState beginTransaction()
+ throws FlinkKafkaException {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
+ producer.beginTransaction();
+ return new FlinkKafkaProducer.KafkaTransactionState(
+ producer.getTransactionalId(), producer);
+ case AT_LEAST_ONCE:
+ case NONE:
+ // Do not create new producer on each beginTransaction() if it is not necessary
+ final FlinkKafkaProducer.KafkaTransactionState currentTransaction =
+ currentTransaction();
+ if (currentTransaction != null && currentTransaction.producer != null) {
+ return new FlinkKafkaProducer.KafkaTransactionState(
+ currentTransaction.producer);
+ }
+ return new FlinkKafkaProducer.KafkaTransactionState(
+ initNonTransactionalProducer(true));
+ default:
+ throw new UnsupportedOperationException("Not implemented semantic");
+ }
+ }
+
+ @Override
+ protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction)
+ throws FlinkKafkaException {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ case AT_LEAST_ONCE:
+ flush(transaction);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException("Not implemented semantic");
+ }
+ checkErroneous();
+ }
+
+ @Override
+ protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ try {
+ transaction.producer.commitTransaction();
+ } finally {
+ recycleTransactionalProducer(transaction.producer);
+ }
+ }
+ }
+
+ @Override
+ protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+ try {
+ producer = initTransactionalProducer(transaction.transactionalId, false);
+ producer.resumeTransaction(transaction.producerId, transaction.epoch);
+ producer.commitTransaction();
+ } catch (InvalidTxnStateException | ProducerFencedException ex) {
+ // That means we have committed this transaction before.
+ LOG.warn(
+ "Encountered error {} while recovering transaction {}. "
+ + "Presumably this transaction has been already committed before",
+ ex,
+ transaction);
+ } finally {
+ if (producer != null) {
+ producer.close(0, TimeUnit.SECONDS);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void abort(FlinkKafkaProducer.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ transaction.producer.abortTransaction();
+ recycleTransactionalProducer(transaction.producer);
+ }
+ }
+
+ @Override
+ protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+ try {
+ producer = initTransactionalProducer(transaction.transactionalId, false);
+ producer.initTransactions();
+ } finally {
+ if (producer != null) {
+ producer.close(0, TimeUnit.SECONDS);
+ }
+ }
+ }
+ }
+
+ /**
+ * <b>ATTENTION to subclass implementors:</b> When overriding this method, please always call
+ * {@code super.acknowledgeMessage()} to keep the invariants of the internal bookkeeping of the
+ * producer. If not, be sure to know what you are doing.
+ */
+ protected void acknowledgeMessage() {
+ pendingRecords.decrementAndGet();
+ }
+
+ /**
+ * Flush pending records.
+ *
+ * @param transaction
+ */
+ private void flush(FlinkKafkaProducer.KafkaTransactionState transaction)
+ throws FlinkKafkaException {
+ if (transaction.producer != null) {
+ transaction.producer.flush();
+ }
+ long pendingRecordsCount = pendingRecords.get();
+ if (pendingRecordsCount != 0) {
+ throw new IllegalStateException(
+ "Pending record count must be zero at this point: " + pendingRecordsCount);
+ }
+
+ // if the flushed requests has errors, we should propagate it also and fail the checkpoint
+ checkErroneous();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+
+ nextTransactionalIdHintState.clear();
+ // To avoid duplication only first subtask keeps track of next transactional id hint.
+ // Otherwise all of the
+ // subtasks would write exactly same information.
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0
+ && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
+ checkState(
+ nextTransactionalIdHint != null,
+ "nextTransactionalIdHint must be set for EXACTLY_ONCE");
+ long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
+
+ // If we scaled up, some (unknown) subtask must have created new transactional ids from
+ // scratch. In that
+ // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be
+ // used for this
+ // scaling up.
+ if (getRuntimeContext().getNumberOfParallelSubtasks()
+ > nextTransactionalIdHint.lastParallelism) {
+ nextFreeTransactionalId +=
+ getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
+ }
+
+ nextTransactionalIdHintState.add(
+ new FlinkKafkaProducer.NextTransactionalIdHint(
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ nextFreeTransactionalId));
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ if (semantic != FlinkKafkaProducer.Semantic.NONE
+ && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.warn(
+ "Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.",
+ semantic,
+ FlinkKafkaProducer.Semantic.NONE);
+ semantic = FlinkKafkaProducer.Semantic.NONE;
+ }
+
+ nextTransactionalIdHintState =
+ context.getOperatorStateStore()
+ .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
+
+ if (context.getOperatorStateStore()
+ .getRegisteredStateNames()
+ .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
+ migrateNextTransactionalIdHindState(context);
+ }
+
+ String taskName = getRuntimeContext().getTaskName();
+ // Kafka transactional IDs are limited in length to be less than the max value of a short,
+ // so we truncate here if necessary to a more reasonable length string.
+ if (taskName.length() > maxTaskNameSize) {
+ taskName = taskName.substring(0, maxTaskNameSize);
+ LOG.warn(
+ "Truncated task name for Kafka TransactionalId from {} to {}.",
+ getRuntimeContext().getTaskName(),
+ taskName);
+ }
+ transactionalIdsGenerator =
+ new TransactionalIdsGenerator(
+ taskName
+ + "-"
+ + ((StreamingRuntimeContext) getRuntimeContext())
+ .getOperatorUniqueID(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ kafkaProducersPoolSize,
+ SAFE_SCALE_DOWN_FACTOR);
+
+ if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
+ nextTransactionalIdHint = null;
+ } else {
+ ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
+ Lists.newArrayList(nextTransactionalIdHintState.get());
+ if (transactionalIdHints.size() > 1) {
+ throw new IllegalStateException(
+ "There should be at most one next transactional id hint written by the first subtask");
+ } else if (transactionalIdHints.size() == 0) {
+ nextTransactionalIdHint = new FlinkKafkaProducer.NextTransactionalIdHint(0, 0);
+
+ // this means that this is either:
+ // (1) the first execution of this application
+ // (2) previous execution has failed before first checkpoint completed
+ //
+ // in case of (2) we have to abort all previous transactions
+ abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
+ } else {
+ nextTransactionalIdHint = transactionalIdHints.get(0);
+ }
+ }
+
+ super.initializeState(context);
+ }
+
+ @Override
+ protected Optional<FlinkKafkaProducer.KafkaTransactionContext> initializeUserContext() {
+ if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
+ return Optional.empty();
+ }
+
+ Set<String> transactionalIds = generateNewTransactionalIds();
+ resetAvailableTransactionalIdsPool(transactionalIds);
+ return Optional.of(new FlinkKafkaProducer.KafkaTransactionContext(transactionalIds));
+ }
+
+ private Set<String> generateNewTransactionalIds() {
+ checkState(
+ nextTransactionalIdHint != null,
+ "nextTransactionalIdHint must be present for EXACTLY_ONCE");
+
+ Set<String> transactionalIds =
+ transactionalIdsGenerator.generateIdsToUse(
+ nextTransactionalIdHint.nextFreeTransactionalId);
+ LOG.info("Generated new transactionalIds {}", transactionalIds);
+ return transactionalIds;
+ }
+
+ @Override
+ protected void finishRecoveringContext(
+ Collection<FlinkKafkaProducer.KafkaTransactionState> handledTransactions) {
+ cleanUpUserContext(handledTransactions);
+ resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
+ LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds);
+ }
+
+ protected FlinkKafkaInternalProducer<byte[], byte[]> createProducer() {
+ return new FlinkKafkaInternalProducer<>(this.producerConfig);
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ /**
+ * After initialization make sure that all previous transactions from the current user context
+ * have been completed.
+ *
+ * @param handledTransactions transactions which were already committed or aborted and do not
+ * need further handling
+ */
+ private void cleanUpUserContext(
+ Collection<FlinkKafkaProducer.KafkaTransactionState> handledTransactions) {
+ if (!getUserContext().isPresent()) {
+ return;
+ }
+ HashSet<String> abortTransactions = new HashSet<>(getUserContext().get().transactionalIds);
+ handledTransactions.forEach(
+ kafkaTransactionState ->
+ abortTransactions.remove(kafkaTransactionState.transactionalId));
+ abortTransactions(abortTransactions);
+ }
+
+ private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) {
+ availableTransactionalIds.clear();
+ availableTransactionalIds.addAll(transactionalIds);
+ }
+
+ private void abortTransactions(final Set<String> transactionalIds) {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ transactionalIds
+ .parallelStream()
+ .forEach(
+ transactionalId -> {
+ // The parallelStream executes the consumer in a separated thread pool.
+ // Because the consumer(e.g. Kafka) uses the context classloader to
+ // construct some class
+ // we should set the correct classloader for it.
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(classLoader)) {
+ // don't mess with the original configuration or any other
+ // properties of the
+ // original object
+ // -> create an internal kafka producer on our own and do not rely
+ // on
+ // initTransactionalProducer().
+ final Properties myConfig = new Properties();
+ myConfig.putAll(producerConfig);
+ initTransactionalProducerConfig(myConfig, transactionalId);
+ FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null;
+ try {
+ kafkaProducer = new FlinkKafkaInternalProducer<>(myConfig);
+ // it suffices to call initTransactions - this will abort any
+ // lingering transactions
+ kafkaProducer.initTransactions();
+ } finally {
+ if (kafkaProducer != null) {
+ kafkaProducer.close(Duration.ofSeconds(0));
+ }
+ }
+ }
+ });
+ }
+
+ int getTransactionCoordinatorId() {
+ final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
+ if (currentTransaction == null || currentTransaction.producer == null) {
+ throw new IllegalArgumentException();
+ }
+ return currentTransaction.producer.getTransactionCoordinatorId();
+ }
+
+ /**
+ * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions
+ * will not clash with transactions created during previous checkpoints ({@code
+ * producer.initTransactions()} assures that we obtain new producerId and epoch counters).
+ */
+ private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer()
+ throws FlinkKafkaException {
+ String transactionalId = availableTransactionalIds.poll();
+ if (transactionalId == null) {
+ throw new FlinkKafkaException(
+ FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY,
+ "Too many ongoing snapshots. Increase kafka producers pool size or "
+ + "decrease number of concurrent checkpoints.");
+ }
+ FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ initTransactionalProducer(transactionalId, true);
+ producer.initTransactions();
+ return producer;
+ }
+
+ private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ availableTransactionalIds.add(producer.getTransactionalId());
+ producer.flush();
+ producer.close(Duration.ofSeconds(0));
+ }
+
+ private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(
+ String transactionalId, boolean registerMetrics) {
+ initTransactionalProducerConfig(producerConfig, transactionalId);
+ return initProducer(registerMetrics);
+ }
+
+ private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer(
+ boolean registerMetrics) {
+ producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ return initProducer(registerMetrics);
+ }
+
+ private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer();
+
+ LOG.info(
+ "Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}",
+ getRuntimeContext().getIndexOfThisSubtask() + 1,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ defaultTopicId);
+
+ // register Kafka metrics to Flink accumulators
+ if (registerMetrics
+ && !Boolean.parseBoolean(
+ producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+ Map<MetricName, ? extends Metric> metrics = producer.metrics();
+
+ if (metrics == null) {
+ // MapR's Kafka implementation returns null here.
+ LOG.info("Producer implementation does not support metrics");
+ } else {
+ final MetricGroup kafkaMetricGroup =
+ getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+ for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
+ String name = entry.getKey().name();
+ Metric metric = entry.getValue();
+
+ KafkaMetricMutableWrapper wrapper = previouslyCreatedMetrics.get(name);
+ if (wrapper != null) {
+ wrapper.setKafkaMetric(metric);
+ } else {
+ // TODO: somehow merge metrics from all active producers?
+ wrapper = new KafkaMetricMutableWrapper(metric);
+ previouslyCreatedMetrics.put(name, wrapper);
+ kafkaMetricGroup.gauge(name, wrapper);
+ }
+ }
+ }
+ }
+ return producer;
+ }
+
+ protected void checkErroneous() throws FlinkKafkaException {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new FlinkKafkaException(
+ FlinkKafkaErrorCode.EXTERNAL_ERROR,
+ "Failed to send data to Kafka: " + e.getMessage(),
+ e);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ private void migrateNextTransactionalIdHindState(FunctionInitializationContext context)
+ throws Exception {
+ ListState<NextTransactionalIdHint> oldNextTransactionalIdHintState =
+ context.getOperatorStateStore()
+ .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+ nextTransactionalIdHintState =
+ context.getOperatorStateStore()
+ .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
+
+ ArrayList<NextTransactionalIdHint> oldTransactionalIdHints =
+ Lists.newArrayList(oldNextTransactionalIdHintState.get());
+ if (!oldTransactionalIdHints.isEmpty()) {
+ nextTransactionalIdHintState.addAll(oldTransactionalIdHints);
+ // clear old state
+ oldNextTransactionalIdHintState.clear();
+ }
+ }
+
+ /**
+ * Semantics that can be chosen.
+ * <li>{@link #EXACTLY_ONCE}
+ * <li>{@link #AT_LEAST_ONCE}
+ * <li>{@link #NONE}
+ */
+ public enum Semantic {
+
+ /**
+ * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
+ * that will be committed to Kafka on a checkpoint.
+ *
+ * <p>In this mode {@link FlinkKafkaProducer} sets up a pool of {@link
+ * FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created,
+ * which is committed on {@link FlinkKafkaProducer#notifyCheckpointComplete(long)}. If
+ * checkpoint complete notifications are running late, {@link FlinkKafkaProducer} can run
+ * out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent
+ * {@link FlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and
+ * {@link FlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from
+ * the previous checkpoint. To decrease the chance of failing checkpoints there are four
+ * options:
+ * <li>decrease number of max concurrent checkpoints
+ * <li>make checkpoints more reliable (so that they complete faster)
+ * <li>increase the delay between checkpoints
+ * <li>increase the size of {@link FlinkKafkaInternalProducer}s pool
+ */
+ EXACTLY_ONCE,
+
+ /**
+ * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the
+ * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.
+ */
+ AT_LEAST_ONCE,
+
+ /**
+ * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or
+ * duplicated in case of failure.
+ */
+ NONE
+ }
+
+ /**
+ * State for handling transactions.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class KafkaTransactionState {
+
+ @Nullable
+ final String transactionalId;
+ final long producerId;
+ final short epoch;
+ private final transient FlinkKafkaInternalProducer<byte[], byte[]> producer;
+
+ @VisibleForTesting
+ public KafkaTransactionState(
+ String transactionalId, FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ this(transactionalId, producer.getProducerId(), producer.getEpoch(), producer);
+ }
+
+ @VisibleForTesting
+ public KafkaTransactionState(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ this(null, -1, (short) -1, producer);
+ }
+
+ @VisibleForTesting
+ public KafkaTransactionState(
+ @Nullable String transactionalId,
+ long producerId,
+ short epoch,
+ FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.epoch = epoch;
+ this.producer = producer;
+ }
+
+ boolean isTransactional() {
+ return transactionalId != null;
+ }
+
+ public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
+ return producer;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s [transactionalId=%s, producerId=%s, epoch=%s]",
+ this.getClass().getSimpleName(), transactionalId, producerId, epoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkKafkaProducer.KafkaTransactionState that =
+ (FlinkKafkaProducer.KafkaTransactionState) o;
+
+ if (producerId != that.producerId) {
+ return false;
+ }
+ if (epoch != that.epoch) {
+ return false;
+ }
+ return transactionalId != null
+ ? transactionalId.equals(that.transactionalId)
+ : that.transactionalId == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = transactionalId != null ? transactionalId.hashCode() : 0;
+ result = 31 * result + (int) (producerId ^ (producerId >>> 32));
+ result = 31 * result + (int) epoch;
+ return result;
+ }
+ }
+
+ /**
+ * Context associated to this instance of the {@link FlinkKafkaProducer}. User for keeping track
+ * of the transactionalIds.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class KafkaTransactionContext {
+
+ final Set<String> transactionalIds;
+
+ @VisibleForTesting
+ public KafkaTransactionContext(Set<String> transactionalIds) {
+ checkNotNull(transactionalIds);
+ this.transactionalIds = transactionalIds;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkKafkaProducer.KafkaTransactionContext that =
+ (FlinkKafkaProducer.KafkaTransactionContext) o;
+
+ return transactionalIds.equals(that.transactionalIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return transactionalIds.hashCode();
+ }
+ }
+
+ /**
+ * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
+ * FlinkKafkaProducer.KafkaTransactionState}.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class TransactionStateSerializer
+ extends TypeSerializerSingleton<FlinkKafkaProducer.KafkaTransactionState> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionState createInstance() {
+ return null;
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionState copy(
+ FlinkKafkaProducer.KafkaTransactionState from) {
+ return from;
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionState copy(
+ FlinkKafkaProducer.KafkaTransactionState from,
+ FlinkKafkaProducer.KafkaTransactionState reuse) {
+ return from;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ boolean hasTransactionalId = source.readBoolean();
+ target.writeBoolean(hasTransactionalId);
+ if (hasTransactionalId) {
+ target.writeUTF(source.readUTF());
+ }
+ target.writeLong(source.readLong());
+ target.writeShort(source.readShort());
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ FlinkKafkaProducer.KafkaTransactionState record, DataOutputView target)
+ throws IOException {
+ if (record.transactionalId == null) {
+ target.writeBoolean(false);
+ } else {
+ target.writeBoolean(true);
+ target.writeUTF(record.transactionalId);
+ }
+ target.writeLong(record.producerId);
+ target.writeShort(record.epoch);
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionState deserialize(DataInputView source)
+ throws IOException {
+ String transactionalId = null;
+ if (source.readBoolean()) {
+ transactionalId = source.readUTF();
+ }
+ long producerId = source.readLong();
+ short epoch = source.readShort();
+ return new FlinkKafkaProducer.KafkaTransactionState(
+ transactionalId, producerId, epoch, null);
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionState deserialize(
+ FlinkKafkaProducer.KafkaTransactionState reuse, DataInputView source)
+ throws IOException {
+ return deserialize(source);
+ }
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState>
+ snapshotConfiguration() {
+ return new TransactionStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class TransactionStateSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {
+
+ public TransactionStateSerializerSnapshot() {
+ super(TransactionStateSerializer::new);
+ }
+ }
+ }
+
+ /**
+ * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
+ * FlinkKafkaProducer.KafkaTransactionContext}.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class ContextStateSerializer
+ extends TypeSerializerSingleton<FlinkKafkaProducer.KafkaTransactionContext> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionContext createInstance() {
+ return null;
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionContext copy(
+ FlinkKafkaProducer.KafkaTransactionContext from) {
+ return from;
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionContext copy(
+ FlinkKafkaProducer.KafkaTransactionContext from,
+ FlinkKafkaProducer.KafkaTransactionContext reuse) {
+ return from;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int numIds = source.readInt();
+ target.writeInt(numIds);
+ for (int i = 0; i < numIds; i++) {
+ target.writeUTF(source.readUTF());
+ }
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ FlinkKafkaProducer.KafkaTransactionContext record, DataOutputView target)
+ throws IOException {
+ int numIds = record.transactionalIds.size();
+ target.writeInt(numIds);
+ for (String id : record.transactionalIds) {
+ target.writeUTF(id);
+ }
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionContext deserialize(DataInputView source)
+ throws IOException {
+ int numIds = source.readInt();
+ Set<String> ids = new HashSet<>(numIds);
+ for (int i = 0; i < numIds; i++) {
+ ids.add(source.readUTF());
+ }
+ return new FlinkKafkaProducer.KafkaTransactionContext(ids);
+ }
+
+ @Override
+ public FlinkKafkaProducer.KafkaTransactionContext deserialize(
+ FlinkKafkaProducer.KafkaTransactionContext reuse, DataInputView source)
+ throws IOException {
+ return deserialize(source);
+ }
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+ return new ContextStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class ContextStateSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+
+ public ContextStateSerializerSnapshot() {
+ super(ContextStateSerializer::new);
+ }
+ }
+ }
+
+ /**
+ * Keep information required to deduce next safe to use transactional id.
+ */
+ public static class NextTransactionalIdHint {
+
+ public int lastParallelism = 0;
+ public long nextFreeTransactionalId = 0;
+
+ public NextTransactionalIdHint() {
+ this(0, 0);
+ }
+
+ public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) {
+ this.lastParallelism = parallelism;
+ this.nextFreeTransactionalId = nextFreeTransactionalId;
+ }
+
+ @Override
+ public String toString() {
+ return "NextTransactionalIdHint["
+ + "lastParallelism="
+ + lastParallelism
+ + ", nextFreeTransactionalId="
+ + nextFreeTransactionalId
+ + ']';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NextTransactionalIdHint that = (NextTransactionalIdHint) o;
+
+ if (lastParallelism != that.lastParallelism) {
+ return false;
+ }
+ return nextFreeTransactionalId == that.nextFreeTransactionalId;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = lastParallelism;
+ result =
+ 31 * result
+ + (int) (nextFreeTransactionalId ^ (nextFreeTransactionalId >>> 32));
+ return result;
+ }
+ }
+
+ /**
+ * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
+ * FlinkKafkaProducer.NextTransactionalIdHint}.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class NextTransactionalIdHintSerializer
+ extends TypeSerializerSingleton<NextTransactionalIdHint> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public NextTransactionalIdHint createInstance() {
+ return new NextTransactionalIdHint();
+ }
+
+ @Override
+ public NextTransactionalIdHint copy(NextTransactionalIdHint from) {
+ return from;
+ }
+
+ @Override
+ public NextTransactionalIdHint copy(
+ NextTransactionalIdHint from, NextTransactionalIdHint reuse) {
+ return from;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeLong(source.readLong());
+ target.writeInt(source.readInt());
+ }
+
+ @Override
+ public int getLength() {
+ return Long.BYTES + Integer.BYTES;
+ }
+
+ @Override
+ public void serialize(NextTransactionalIdHint record, DataOutputView target)
+ throws IOException {
+ target.writeLong(record.nextFreeTransactionalId);
+ target.writeInt(record.lastParallelism);
+ }
+
+ @Override
+ public NextTransactionalIdHint deserialize(DataInputView source) throws IOException {
+ long nextFreeTransactionalId = source.readLong();
+ int lastParallelism = source.readInt();
+ return new NextTransactionalIdHint(lastParallelism, nextFreeTransactionalId);
+ }
+
+ @Override
+ public NextTransactionalIdHint deserialize(
+ NextTransactionalIdHint reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public TypeSerializerSnapshot<NextTransactionalIdHint> snapshotConfiguration() {
+ return new NextTransactionalIdHintSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class NextTransactionalIdHintSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<NextTransactionalIdHint> {
+
+ public NextTransactionalIdHintSerializerSnapshot() {
+ super(NextTransactionalIdHintSerializer::new);
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index f2d42d3a2..387baee79 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunction;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
@@ -60,9 +59,12 @@ import static org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_C
/**
* A version-agnostic Kafka {@link DynamicTableSink}.
+ *
+ * Add an option `inlong.metric` to support metrics.
*/
@Internal
public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
+
private static final Logger LOG = LoggerFactory.getLogger(KafkaDynamicSink.class);
// --------------------------------------------------------------------------------------------
@@ -105,19 +107,14 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
* Properties for the Kafka producer.
*/
protected final Properties properties;
-
/**
- * CatalogTable for KAFKA_IGNORE_ALL_CHANGELOG
+ * Partitioner to select Kafka partition for each item.
*/
- private final CatalogTable catalogTable;
+ protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
// --------------------------------------------------------------------------------------------
// Kafka-specific attributes
// --------------------------------------------------------------------------------------------
- /**
- * Partitioner to select Kafka partition for each item.
- */
- protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
/**
* Sink commit semantic.
*/
@@ -135,6 +132,18 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
* Parallelism of the physical Kafka producer. *
*/
protected final @Nullable Integer parallelism;
+ /**
+ * CatalogTable for KAFKA_IGNORE_ALL_CHANGELOG
+ */
+ private final CatalogTable catalogTable;
+ /**
+ * Metric for inLong
+ */
+ private final String inLongMetric;
+ /**
+ * audit host and ports
+ */
+ private final String auditHostAndPorts;
/**
* Metadata that is appended at the end of a physical sink row.
*/
@@ -162,7 +171,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
KafkaSinkSemantic semantic,
boolean upsertMode,
SinkBufferFlushMode flushMode,
- @Nullable Integer parallelism) {
+ @Nullable Integer parallelism,
+ String inLongMetric,
+ String auditHostAndPorts) {
// Format attributes
this.consumedDataType =
checkNotNull(consumedDataType, "Consumed data type must not be null.");
@@ -189,6 +200,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
"Sink buffer flush is only supported in upsert-kafka.");
}
this.parallelism = parallelism;
+ this.inLongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
@Override
@@ -288,7 +301,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
semantic,
upsertMode,
flushMode,
- parallelism);
+ parallelism,
+ inLongMetric,
+ auditHostAndPorts);
copy.metadataKeys = metadataKeys;
return copy;
}
@@ -321,7 +336,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
&& Objects.equals(semantic, that.semantic)
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(flushMode, that.flushMode)
- && Objects.equals(parallelism, that.parallelism);
+ && Objects.equals(parallelism, that.parallelism)
+ && Objects.equals(inLongMetric, that.inLongMetric)
+ && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
}
@Override
@@ -341,7 +358,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
semantic,
upsertMode,
flushMode,
- parallelism);
+ parallelism,
+ inLongMetric,
+ auditHostAndPorts);
}
// --------------------------------------------------------------------------------------------
@@ -400,7 +419,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
kafkaSerializer,
properties,
FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
- FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE,
+ inLongMetric,
+ auditHostAndPorts);
}
private @Nullable SerializationSchema<RowData> createSerialization(
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index c0e41f1b0..8efee1317 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -307,6 +307,10 @@ public class KafkaDynamicTableFactory
final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+ final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+
+ final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+
return createKafkaTableSink(
physicalDataType,
keyEncodingFormat.orElse(null),
@@ -319,7 +323,9 @@ public class KafkaDynamicTableFactory
context.getCatalogTable(),
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
getSinkSemantic(tableOptions),
- parallelism);
+ parallelism,
+ inLongMetric,
+ auditHostAndPorts);
}
// --------------------------------------------------------------------------------------------
@@ -369,7 +375,9 @@ public class KafkaDynamicTableFactory
CatalogTable table,
FlinkKafkaPartitioner<RowData> partitioner,
KafkaSinkSemantic semantic,
- Integer parallelism) {
+ Integer parallelism,
+ String inLongMetric,
+ String auditHostAndPorts) {
return new KafkaDynamicSink(
physicalDataType,
physicalDataType,
@@ -385,6 +393,8 @@ public class KafkaDynamicTableFactory
semantic,
false,
SinkBufferFlushMode.DISABLED,
- parallelism);
+ parallelism,
+ inLongMetric,
+ auditHostAndPorts);
}
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
new file mode 100644
index 000000000..6e803cfd9
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.kafka.KafkaDynamicSink;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FIELDS_INCLUDE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.autoCompleteSchemaRegistrySubject;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Upsert-Kafka factory.
+ * Add an option `inlong.metric` to support metrics.
+ */
+public class UpsertKafkaDynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "upsert-kafka-inlong";
+
+ private static void validateSource(
+ ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) {
+ validateTopic(tableOptions);
+ validateFormat(keyFormat, valueFormat, tableOptions);
+ validatePKConstraints(schema);
+ }
+
+ private static void validateSink(
+ ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) {
+ validateTopic(tableOptions);
+ validateFormat(keyFormat, valueFormat, tableOptions);
+ validatePKConstraints(schema);
+ validateSinkBufferFlush(tableOptions);
+ }
+
+ private static void validateTopic(ReadableConfig tableOptions) {
+ List<String> topic = tableOptions.get(TOPIC);
+ if (topic.size() > 1) {
+ throw new ValidationException(
+ "The 'upsert-kafka' connector doesn't support topic list now. "
+ + "Please use single topic as the value of the parameter 'topic'.");
+ }
+ }
+
+ private static void validateFormat(
+ Format keyFormat, Format valueFormat, ReadableConfig tableOptions) {
+ if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ String identifier = tableOptions.get(KEY_FORMAT);
+ throw new ValidationException(
+ String.format(
+ "'upsert-kafka' connector doesn't support '%s' as key format, "
+ + "because '%s' is not in insert-only mode.",
+ identifier, identifier));
+ }
+ if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ String identifier = tableOptions.get(VALUE_FORMAT);
+ throw new ValidationException(
+ String.format(
+ "'upsert-kafka' connector doesn't support '%s' as value format, "
+ + "because '%s' is not in insert-only mode.",
+ identifier, identifier));
+ }
+ }
+
+ private static void validatePKConstraints(TableSchema schema) {
+ if (!schema.getPrimaryKey().isPresent()) {
+ throw new ValidationException(
+ "'upsert-kafka' tables require to define a PRIMARY KEY constraint. "
+ + "The PRIMARY KEY specifies which columns should be read from "
+ + "or write to the Kafka message key. "
+ + "The PRIMARY KEY also defines records in the 'upsert-kafka' table "
+ + "should update or delete on which keys.");
+ }
+ }
+
+ private static void validateSinkBufferFlush(ReadableConfig tableOptions) {
+ int flushMaxRows = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+ long flushIntervalMs = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis();
+ if (flushMaxRows > 0 && flushIntervalMs > 0) {
+ // flush is enabled
+ return;
+ }
+ if (flushMaxRows <= 0 && flushIntervalMs <= 0) {
+ // flush is disabled
+ return;
+ }
+ // one of them is set which is not allowed
+ throw new ValidationException(
+ String.format(
+ "'%s' and '%s' must be set to be greater than zero together to enable sink buffer flushing.",
+ SINK_BUFFER_FLUSH_MAX_ROWS.key(), SINK_BUFFER_FLUSH_INTERVAL.key()));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Validation
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ options.add(TOPIC);
+ options.add(KEY_FORMAT);
+ options.add(VALUE_FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(KEY_FIELDS_PREFIX);
+ options.add(VALUE_FIELDS_INCLUDE);
+ options.add(FactoryUtil.SINK_PARALLELISM);
+ options.add(SINK_BUFFER_FLUSH_INTERVAL);
+ options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ options.add(KAFKA_IGNORE_ALL_CHANGELOG);
+ options.add(INLONG_METRIC);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ ReadableConfig tableOptions = helper.getOptions();
+ DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat =
+ helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+ helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+
+ // Validate the option data type.
+ helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+ TableSchema schema = context.getCatalogTable().getSchema();
+ validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat, schema);
+
+ Tuple2<int[], int[]> keyValueProjections =
+ createKeyValueProjections(context.getCatalogTable());
+ String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+ Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+ // always use earliest to keep data integrity
+ StartupMode earliest = StartupMode.EARLIEST;
+
+ return new KafkaDynamicSource(
+ schema.toPhysicalRowDataType(),
+ keyDecodingFormat,
+ new DecodingFormatWrapper(valueDecodingFormat),
+ keyValueProjections.f0,
+ keyValueProjections.f1,
+ keyPrefix,
+ KafkaOptions.getSourceTopics(tableOptions),
+ KafkaOptions.getSourceTopicPattern(tableOptions),
+ properties,
+ earliest,
+ Collections.emptyMap(),
+ 0,
+ true);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(
+ this, autoCompleteSchemaRegistrySubject(context));
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT);
+
+ // Validate the option data type.
+ helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+ TableSchema schema = context.getCatalogTable().getSchema();
+ validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat, schema);
+
+ Tuple2<int[], int[]> keyValueProjections =
+ createKeyValueProjections(context.getCatalogTable());
+ final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+ final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+
+ Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM);
+
+ int batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+ Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
+ SinkBufferFlushMode flushMode =
+ new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
+ String inLongMetric = tableOptions.get(INLONG_METRIC);
+ final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+
+ // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+ // it will use hash partition if key is set else in round-robin behaviour.
+ return new KafkaDynamicSink(
+ schema.toPhysicalRowDataType(),
+ schema.toPhysicalRowDataType(),
+ keyEncodingFormat,
+ new EncodingFormatWrapper(valueEncodingFormat),
+ keyValueProjections.f0,
+ keyValueProjections.f1,
+ keyPrefix,
+ tableOptions.get(TOPIC).get(0),
+ properties,
+ context.getCatalogTable(),
+ null,
+ KafkaSinkSemantic.AT_LEAST_ONCE,
+ true,
+ flushMode,
+ parallelism,
+ inLongMetric,
+ auditHostAndPorts);
+ }
+
+ private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable catalogTable) {
+ TableSchema schema = catalogTable.getSchema();
+ // primary key should validated earlier
+ List<String> keyFields = schema.getPrimaryKey().get().getColumns();
+ DataType physicalDataType = schema.toPhysicalRowDataType();
+
+ Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions());
+ // upsert-kafka will set key.fields to primary key fields by default
+ tableOptions.set(KEY_FIELDS, keyFields);
+
+ int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
+ int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
+
+ return Tuple2.of(keyProjection, valueProjection);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Format wrapper
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * It is used to wrap the decoding format and expose the desired changelog mode. It's only works
+ * for insert-only format.
+ */
+ protected static class DecodingFormatWrapper
+ implements DecodingFormat<DeserializationSchema<RowData>> {
+
+ private static final ChangelogMode SOURCE_CHANGELOG_MODE =
+ ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ private final DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat;
+
+ public DecodingFormatWrapper(
+ DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat) {
+ this.innerDecodingFormat = innerDecodingFormat;
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType producedDataType) {
+ return innerDecodingFormat.createRuntimeDecoder(context, producedDataType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return SOURCE_CHANGELOG_MODE;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ DecodingFormatWrapper that = (DecodingFormatWrapper) obj;
+ return Objects.equals(innerDecodingFormat, that.innerDecodingFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerDecodingFormat);
+ }
+ }
+
+ /**
+ * It is used to wrap the encoding format and expose the desired changelog mode. It's only works
+ * for insert-only format.
+ */
+ protected static class EncodingFormatWrapper
+ implements EncodingFormat<SerializationSchema<RowData>> {
+
+ public static final ChangelogMode SINK_CHANGELOG_MODE =
+ ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ private final EncodingFormat<SerializationSchema<RowData>> innerEncodingFormat;
+
+ public EncodingFormatWrapper(
+ EncodingFormat<SerializationSchema<RowData>> innerEncodingFormat) {
+ this.innerEncodingFormat = innerEncodingFormat;
+ }
+
+ @Override
+ public SerializationSchema<RowData> createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType consumedDataType) {
+ return innerEncodingFormat.createRuntimeEncoder(context, consumedDataType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return SINK_CHANGELOG_MODE;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ EncodingFormatWrapper that = (EncodingFormatWrapper) obj;
+ return Objects.equals(innerEncodingFormat, that.innerEncodingFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerEncodingFormat);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 543f247b3..7eeb7cd7a 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
\ No newline at end of file
+org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory
\ No newline at end of file