You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/19 11:30:14 UTC
[inlong] 01/02: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit aaf175ba471fdadd09f11f31cb29ac0d69e42de7
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Sep 19 11:37:57 2022 +0800
[INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
---
.../org/apache/inlong/sort/base/Constants.java | 4 +
.../inlong/sort/base/metric/MetricState.java | 8 +
.../inlong/sort/base/metric/SinkMetricData.java | 82 +-
.../inlong/sort/base/util/MetricStateUtils.java | 24 +
inlong-sort/sort-connectors/kafka/pom.xml | 6 +
.../inlong/sort/kafka/FlinkKafkaConsumer.java | 352 +++++
.../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 1350 ++++++++++++++++++++
.../inlong/sort/kafka/FlinkKafkaProducer.java | 69 +-
.../table/DynamicKafkaDeserializationSchema.java | 61 +-
.../sort/kafka/table/KafkaDynamicSource.java | 82 +-
.../sort/cdc/debezium/DebeziumSourceFunction.java | 2 +
licenses/inlong-sort-connectors/LICENSE | 12 +
12 files changed, 1933 insertions(+), 119 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index b7bf91ef9..18ff408f2 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -37,6 +37,10 @@ public final class Constants {
public static final String NUM_RECORDS_OUT = "numRecordsOut";
+ public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter";
+
+ public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter";
+
public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond";
public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond";
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
index 9240c0c8a..604800ccf 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -62,4 +62,12 @@ public class MetricState implements Serializable {
}
return 0L;
}
+
+ @Override
+ public String toString() {
+ return "MetricState{"
+ + "subtaskIndex=" + subtaskIndex
+ + ", metrics=" + metrics.toString()
+ + '}';
+ }
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 67b47657e..4073ddd44 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -34,8 +34,10 @@ import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
/**
@@ -50,6 +52,8 @@ public class SinkMetricData implements MetricData {
private AuditImp auditImp;
private Counter numRecordsOut;
private Counter numBytesOut;
+ private Counter numRecordsOutForMeter;
+ private Counter numBytesOutForMeter;
private Counter dirtyRecords;
private Counter dirtyBytes;
private Meter numRecordsOutPerSecond;
@@ -76,6 +80,43 @@ public class SinkMetricData implements MetricData {
}
}
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsOutForMeter() {
+ registerMetricsForNumRecordsOutForMeter(new SimpleCounter());
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumRecordsOutForMeter(Counter counter) {
+ numRecordsOutForMeter = registerCounter(NUM_RECORDS_OUT_FOR_METER, counter);
+ }
+
+ /**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesOutForMeter() {
+ registerMetricsForNumBytesOutForMeter(new SimpleCounter());
+
+ }
+
+ /**
+ * User can use custom counter that extends from {@link Counter}
+ * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+ * prometheus
+ */
+ public void registerMetricsForNumBytesOutForMeter(Counter counter) {
+ numBytesOutForMeter = registerCounter(NUM_BYTES_OUT_FOR_METER, counter);
+ }
+
/**
* Default counter is {@link SimpleCounter}
* groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
@@ -114,11 +155,11 @@ public class SinkMetricData implements MetricData {
}
public void registerMetricsForNumRecordsOutPerSecond() {
- numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOut);
+ numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOutForMeter);
}
public void registerMetricsForNumBytesOutPerSecond() {
- numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOut);
+ numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOutForMeter);
}
public void registerMetricsForDirtyRecords() {
@@ -191,10 +232,20 @@ public class SinkMetricData implements MetricData {
return nodeId;
}
+ public Counter getNumRecordsOutForMeter() {
+ return numRecordsOutForMeter;
+ }
+
+ public Counter getNumBytesOutForMeter() {
+ return numBytesOutForMeter;
+ }
+
public void invokeWithEstimate(Object o) {
long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
- getNumRecordsOut().inc();
- getNumBytesOut().inc(size);
+ this.numRecordsOut.inc();
+ this.numBytesOut.inc(size);
+ this.numRecordsOutForMeter.inc();
+ this.numBytesOutForMeter.inc(size);
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_OUTPUT,
@@ -207,8 +258,10 @@ public class SinkMetricData implements MetricData {
}
public void invoke(long rowCount, long rowSize) {
- getNumRecordsOut().inc(rowCount);
- getNumBytesOut().inc(rowSize);
+ this.numRecordsOut.inc(rowCount);
+ this.numBytesOut.inc(rowSize);
+ this.numRecordsOutForMeter.inc(rowCount);
+ this.numBytesOutForMeter.inc(rowSize);
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_OUTPUT,
@@ -219,4 +272,21 @@ public class SinkMetricData implements MetricData {
rowSize);
}
}
+
+ @Override
+ public String toString() {
+ return "SinkMetricData{"
+ + "groupId='" + groupId + '\''
+ + ", streamId='" + streamId + '\''
+ + ", nodeId='" + nodeId + '\''
+ + ", numRecordsOut=" + numRecordsOut.getCount()
+ + ", numBytesOut=" + numBytesOut.getCount()
+ + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+ + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+ + ", dirtyRecords=" + dirtyRecords.getCount()
+ + ", dirtyBytes=" + dirtyBytes.getCount()
+ + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+ + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+ + '}';
+ }
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
index d878381ba..416c8b719 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.base.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ListState;
import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import java.util.ArrayList;
@@ -29,7 +30,9 @@ import java.util.List;
import java.util.Map;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
/**
* metric state for {@link MetricState} supporting snapshot and restore
@@ -125,4 +128,25 @@ public class MetricStateUtils {
metricStateListState.add(metricState);
}
+ /**
+ *
+ * Snapshot metric state data for {@link SinkMetricData}
+ * @param metricStateListState state data list
+ * @param sinkMetricData {@link SinkMetricData} A collection class for handling metrics
+ * @param subtaskIndex subtask index
+ * @throws Exception throw exception when add metric state
+ */
+ public static void snapshotMetricStateForSinkMetricData(ListState<MetricState> metricStateListState,
+ SinkMetricData sinkMetricData, Integer subtaskIndex)
+ throws Exception {
+ log.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}",
+ metricStateListState, sinkMetricData, subtaskIndex);
+ metricStateListState.clear();
+ Map<String, Long> metricDataMap = new HashMap<>();
+ metricDataMap.put(NUM_RECORDS_OUT, sinkMetricData.getNumRecordsOut().getCount());
+ metricDataMap.put(NUM_BYTES_OUT, sinkMetricData.getNumBytesOut().getCount());
+ MetricState metricState = new MetricState(subtaskIndex, metricDataMap);
+ metricStateListState.add(metricState);
+ }
+
}
diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml
index b3a844cd8..401c0dfbb 100644
--- a/inlong-sort/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-connectors/kafka/pom.xml
@@ -92,6 +92,12 @@
</filter>
</filters>
<relocations>
+ <relocation>
+ <pattern>org.apache.inlong.sort.base</pattern>
+ <shadedPattern>
+ org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base
+ </shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
new file mode 100644
index 000000000..924944188
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getBoolean;
+import static org.apache.flink.util.PropertiesUtil.getLong;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache
+ * Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one
+ * or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". (Note: These
+ * guarantees naturally assume that Kafka itself does not loose any data.)
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed
+ * checkpoints. The offsets committed to Kafka are only to bring the outside view of progress in
+ * sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how
+ * far the Flink Kafka consumer has consumed a topic.
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs
+ */
+@PublicEvolving
+public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Configuration key to change the polling timeout. *
+ */
+ public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+ /**
+ * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+ * available. If 0, returns immediately with any records that are available now.
+ */
+ public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * User-supplied properties for Kafka. *
+ */
+ protected final Properties properties;
+
+ /**
+ * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+ * available. If 0, returns immediately with any records that are available now
+ */
+ protected final long pollTimeout;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new Kafka streaming source consumer.
+ *
+ * @param topic The name of the topic that should be consumed.
+ * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and
+ * Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer(
+ String topic, DeserializationSchema<T> valueDeserializer, Properties props, String inlongMetric,
+ String auditHostAndPorts) {
+ this(Collections.singletonList(topic), valueDeserializer, props, inlongMetric, auditHostAndPorts);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer.
+ *
+ * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param topic The name of the topic that should be consumed.
+ * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+ * and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer(
+ String topic, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+ String auditHostAndPorts) {
+ this(Collections.singletonList(topic), deserializer, props, inlongMetric, auditHostAndPorts);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer.
+ *
+ * <p>This constructor allows passing multiple topics to the consumer.
+ *
+ * @param topics The Kafka topics to read from.
+ * @param deserializer The de-/serializer used to convert between Kafka's byte messages and
+ * Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer(
+ List<String> topics, DeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+ String auditHostAndPorts) {
+ this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props, inlongMetric, auditHostAndPorts);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer.
+ *
+ * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
+ *
+ * @param topics The Kafka topics to read from.
+ * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+ * and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer(
+ List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+ String auditHostAndPorts) {
+ this(topics, null, deserializer, props, inlongMetric, auditHostAndPorts);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple
+ * topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for {@link
+ * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with
+ * names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe
+ * to.
+ * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and
+ * Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer(
+ Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer,
+ Properties props, String inlongMetric, String auditHostAndPorts) {
+ this(
+ null,
+ subscriptionPattern,
+ new KafkaDeserializationSchemaWrapper<>(valueDeserializer),
+ props, inlongMetric, auditHostAndPorts);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple
+ * topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for {@link
+ * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with
+ * names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe
+ * to.
+ * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+ * and Flink's objects.
+ * @param props
+ */
+ public FlinkKafkaConsumer(
+ Pattern subscriptionPattern,
+ KafkaDeserializationSchema<T> deserializer,
+ Properties props, String inlongMetric, String auditHostAndPorts) {
+ this(null, subscriptionPattern, deserializer, props, inlongMetric, auditHostAndPorts);
+ }
+
+ private FlinkKafkaConsumer(
+ List<String> topics,
+ Pattern subscriptionPattern,
+ KafkaDeserializationSchema<T> deserializer,
+ Properties props, String inlongMetric,
+ String auditHostAndPorts) {
+
+ super(
+ topics,
+ subscriptionPattern,
+ deserializer,
+ getLong(
+ checkNotNull(props, "props"),
+ KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+ PARTITION_DISCOVERY_DISABLED),
+ !getBoolean(props, KEY_DISABLE_METRICS, false), inlongMetric, auditHostAndPorts);
+
+ this.properties = props;
+ setDeserializer(this.properties);
+
+ // configure the polling timeout
+ try {
+ if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+ this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+ } else {
+ this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+ }
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics)
+ throws Exception {
+
+ // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+ // this overwrites whatever setting the user configured in the properties
+ adjustAutoCommitConfig(properties, offsetCommitMode);
+
+ return new KafkaFetcher<>(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarkStrategy,
+ runtimeContext.getProcessingTimeService(),
+ runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+ runtimeContext.getUserCodeClassLoader(),
+ runtimeContext.getTaskNameWithSubtasks(),
+ deserializer,
+ properties,
+ pollTimeout,
+ runtimeContext.getMetricGroup(),
+ consumerMetricGroup,
+ useMetrics);
+ }
+
+ @Override
+ protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
+
+ return new KafkaPartitionDiscoverer(
+ topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
+ }
+
+ @Override
+ protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
+ Collection<KafkaTopicPartition> partitions, long timestamp) {
+
+ Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size());
+ for (KafkaTopicPartition partition : partitions) {
+ partitionOffsetsRequest.put(
+ new TopicPartition(partition.getTopic(), partition.getPartition()), timestamp);
+ }
+
+ final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
+ // use a short-lived consumer to fetch the offsets;
+ // this is ok because this is a one-time operation that happens only on startup
+ try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
+ for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
+ consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
+
+ result.put(
+ new KafkaTopicPartition(
+ partitionToOffset.getKey().topic(),
+ partitionToOffset.getKey().partition()),
+ (partitionToOffset.getValue() == null)
+ ? null
+ : partitionToOffset.getValue().offset());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ protected boolean getIsAutoCommitEnabled() {
+ return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+ && PropertiesUtil.getLong(
+ properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
+ > 0;
+ }
+
+ /**
+ * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
+ *
+ * @param props The Kafka properties to register the serializer in.
+ */
+ private static void setDeserializer(Properties props) {
+ final String deSerName = ByteArrayDeserializer.class.getName();
+
+ Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+ LOG.warn(
+ "Ignoring configured key DeSerializer ({})",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ }
+ if (valDeSer != null && !valDeSer.equals(deSerName)) {
+ LOG.warn(
+ "Ignoring configured value DeSerializer ({})",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
+ }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 000000000..0d0ab4544
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,1350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
+import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_FAILED_METRICS_COUNTER;
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_SUCCEEDED_METRICS_COUNTER;
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
+ * all Kafka versions.
+ *
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the {@link
+ * AbstractFetcher}.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+@Internal
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
+ implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
+
+ private static final long serialVersionUID = -6272159445203409112L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+ /**
+ * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+ */
+ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+ /**
+ * The default interval to execute partition discovery, in milliseconds ({@code Long.MIN_VALUE},
+ * i.e. disabled by default).
+ */
+ public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
+
+ /**
+ * Boolean configuration key to disable metrics tracking. *
+ */
+ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+ /**
+ * Configuration key to define the consumer's partition discovery interval, in milliseconds.
+ */
+ public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS =
+ "flink.partition-discovery.interval-millis";
+
+ /**
+ * State name of the consumer's partition offset states.
+ */
+ private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
+
+ // ------------------------------------------------------------------------
+ // configuration state, set on the client relevant for all subtasks
+ // ------------------------------------------------------------------------
+
+ /**
+ * Describes whether we are discovering partitions for fixed topics or a topic pattern.
+ */
+ private final KafkaTopicsDescriptor topicsDescriptor;
+
+ /**
+ * The schema to convert between Kafka's byte messages, and Flink's objects.
+ */
+ protected final KafkaDeserializationSchema<T> deserializer;
+
+ /**
+ * The set of topic partitions that the source will read, with their initial offsets to start
+ * reading from.
+ */
+ private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
+
+ /**
+ * Optional watermark strategy that will be run per Kafka partition, to exploit per-partition
+ * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
+ * it into multiple copies.
+ */
+ private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
+
+ /**
+ * User-set flag determining whether or not to commit on checkpoints. Note: this flag does not
+ * represent the final offset commit mode.
+ */
+ private boolean enableCommitOnCheckpoints = true;
+
+ /**
+ * User-set flag to disable filtering restored partitions with current topics descriptor.
+ */
+ private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = true;
+
+ /**
+ * The offset commit mode for the consumer. The value of this can only be determined in {@link
+ * FlinkKafkaConsumerBase#open(Configuration)} since it depends on whether or not checkpointing
+ * is enabled for the job.
+ */
+ private OffsetCommitMode offsetCommitMode;
+
+ /**
+ * User configured value for discovery interval, in milliseconds.
+ */
+ private final long discoveryIntervalMillis;
+
+ /**
+ * The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}).
+ */
+ private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
+ /**
+ * Specific startup offsets; only relevant when startup mode is {@link
+ * StartupMode#SPECIFIC_OFFSETS}.
+ */
+ private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+ /**
+ * Timestamp to determine startup offsets; only relevant when startup mode is {@link
+ * StartupMode#TIMESTAMP}.
+ */
+ private Long startupOffsetsTimestamp;
+
+ // ------------------------------------------------------------------------
+ // runtime state (used individually by each parallel subtask)
+ // ------------------------------------------------------------------------
+
+ /**
+ * Data for pending but uncommitted offsets.
+ */
+ private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ /**
+ * The fetcher implements the connections to the Kafka brokers.
+ */
+ private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+
+ /**
+ * The partition discoverer, used to find new partitions.
+ */
+ private transient volatile AbstractPartitionDiscoverer partitionDiscoverer;
+
+ /**
+ * The offsets to restore to, if the consumer restores state from a checkpoint.
+ *
+ * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+ * method.
+ *
+ * <p>Using a sorted map as the ordering is important when using restored state to seed the
+ * partition discoverer.
+ */
+ private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
+
+ /**
+ * Accessor for state in the operator state backend.
+ */
+ private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
+
+ /**
+ * Discovery loop, executed in a separate thread.
+ */
+ private transient volatile Thread discoveryLoopThread;
+
+ /**
+ * Flag indicating whether the consumer is still running.
+ */
+ private volatile boolean running = true;
+
+ // ------------------------------------------------------------------------
+ // internal metrics
+ // ------------------------------------------------------------------------
+
+ /**
+ * Flag indicating whether or not metrics should be exposed. If {@code true}, offset metrics
+ * (e.g. current offset, committed offset) and Kafka-shipped metrics will be registered.
+ */
+ private final boolean useMetrics;
+
+ /**
+ * Counter for successful Kafka offset commits.
+ */
+ private transient Counter successfulCommits;
+
+ /**
+ * Counter for failed Kafka offset commits.
+ */
+ private transient Counter failedCommits;
+
+ /**
+ * Callback interface that will be invoked upon async Kafka commit completion. Please be aware
+ * that default callback implementation in base class does not provide any guarantees on
+ * thread-safety. This is sufficient for now because current supported Kafka connectors
+ * guarantee no more than 1 concurrent async pending offset commit.
+ */
+ private transient KafkaCommitCallback offsetCommitCallback;
+
+ private transient ListState<MetricState> metricStateListState;
+
+ private MetricState metricState;
+
+ /**
+ * Metric for InLong
+ */
+ private String inlongMetric;
+ /**
+ * audit host and ports
+ */
+ private String inlongAudit;
+
+ private SourceMetricData sourceMetricData;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Base constructor.
+ *
+ * @param topics fixed list of topics to subscribe to (null, if using topic pattern)
+ * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics)
+ * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects.
+ * @param discoveryIntervalMillis the topic / partition discovery interval, in milliseconds (0
+ * if discovery is disabled).
+ */
+ public FlinkKafkaConsumerBase(
+ List<String> topics,
+ Pattern topicPattern,
+ KafkaDeserializationSchema<T> deserializer,
+ long discoveryIntervalMillis,
+ boolean useMetrics, String inlongMetric, String auditHostAndPorts) {
+ this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
+ this.deserializer = checkNotNull(deserializer, "valueDeserializer");
+
+ checkArgument(
+ discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED
+ || discoveryIntervalMillis >= 0,
+ "Cannot define a negative value for the topic / partition discovery interval.");
+ this.discoveryIntervalMillis = discoveryIntervalMillis;
+
+ this.useMetrics = useMetrics;
+ this.inlongMetric = inlongMetric;
+ this.inlongAudit = auditHostAndPorts;
+ }
+
+ /**
+ * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS. This
+ * overwrites whatever setting the user configured in the properties.
+ *
+ * @param properties - Kafka configuration properties to be adjusted
+ * @param offsetCommitMode offset commit mode
+ */
+ protected static void adjustAutoCommitConfig(
+ Properties properties, OffsetCommitMode offsetCommitMode) {
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS
+ || offsetCommitMode == OffsetCommitMode.DISABLED) {
+ properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ }
+ }
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign
+ * timestamps to records and generates watermarks to signal event time progress.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source
+ * (which you can do by using this method), per Kafka partition, allows users to let them
+ * exploit the per-partition characteristics.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+ * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly
+ * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+ * DataStream, if the parallel source subtask reads more than one partition.
+ *
+ * <p>Common watermark generation patterns can be found as static methods in the {@link
+ * WatermarkStrategy} class.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+ WatermarkStrategy<T> watermarkStrategy) {
+ checkNotNull(watermarkStrategy);
+
+ try {
+ ClosureCleaner.clean(
+ watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "The given WatermarkStrategy is not serializable", e);
+ }
+
+ return this;
+ }
+
+ /**
+ * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated
+ * manner. The watermark extractor will run per Kafka partition, watermarks will be merged
+ * across partitions in the same way as in the Flink runtime, when streams are merged.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+ * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly
+ * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+ * DataStream, if the parallel source subtask reads more than one partition.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per
+ * Kafka partition, allows users to let them exploit the per-partition characteristics.
+ *
+ * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link
+ * AssignerWithPeriodicWatermarks}, not both at the same time.
+ *
+ * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
+ * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
+ * interfaces support watermark idleness and no longer need to differentiate between "periodic"
+ * and "punctuated" watermarks.
+ *
+ * @param assigner The timestamp assigner / watermark generator to use.
+ * @return The consumer object, to allow function chaining.
+ * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+ AssignerWithPunctuatedWatermarks<T> assigner) {
+ checkNotNull(assigner);
+
+ if (this.watermarkStrategy != null) {
+ throw new IllegalStateException("Some watermark strategy has already been set.");
+ }
+
+ try {
+ ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ final WatermarkStrategy<T> wms =
+ new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner);
+
+ return assignTimestampsAndWatermarks(wms);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given assigner is not serializable", e);
+ }
+ }
+
+ /**
+ * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated
+ * manner. The watermark extractor will run per Kafka partition, watermarks will be merged
+ * across partitions in the same way as in the Flink runtime, when streams are merged.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+ * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly
+ * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+ * DataStream, if the parallel source subtask reads more that one partition.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per
+ * Kafka partition, allows users to let them exploit the per-partition characteristics.
+ *
+ * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link
+ * AssignerWithPeriodicWatermarks}, not both at the same time.
+ *
+ * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
+ * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
+ * interfaces support watermark idleness and no longer need to differentiate between "periodic"
+ * and "punctuated" watermarks.
+ *
+ * @param assigner The timestamp assigner / watermark generator to use.
+ * @return The consumer object, to allow function chaining.
+ * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+ AssignerWithPeriodicWatermarks<T> assigner) {
+ checkNotNull(assigner);
+
+ if (this.watermarkStrategy != null) {
+ throw new IllegalStateException("Some watermark strategy has already been set.");
+ }
+
+ try {
+ ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ final WatermarkStrategy<T> wms =
+ new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner);
+
+ return assignTimestampsAndWatermarks(wms);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given assigner is not serializable", e);
+ }
+ }
+
+ /**
+ * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
+ *
+ * <p>This setting will only have effect if checkpointing is enabled for the job. If
+ * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit"
+ * (for 0.9+) property settings will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
+ this.enableCommitOnCheckpoints = commitOnCheckpoints;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading from the earliest offset for all partitions. This
+ * lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+ *
+ * <p>This method does not affect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+ * only the offsets in the restored state will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
+ this.startupMode = StartupMode.EARLIEST;
+ this.startupOffsetsTimestamp = null;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading from the latest offset for all partitions. This lets
+ * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+ *
+ * <p>This method does not affect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+ * only the offsets in the restored state will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setStartFromLatest() {
+ this.startupMode = StartupMode.LATEST;
+ this.startupOffsetsTimestamp = null;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading partitions from a specified timestamp. The specified
+ * timestamp must be before the current timestamp. This lets the consumer ignore any committed
+ * group offsets in Zookeeper / Kafka brokers.
+ *
+ * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal to
+ * the specific timestamp from Kafka. If there's no such offset, the consumer will use the
+ * latest offset to read data from kafka.
+ *
+ * <p>This method does not affect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+ * only the offsets in the restored state will be used.
+ *
+ * @param startupOffsetsTimestamp timestamp for the startup offsets, as milliseconds from epoch.
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
+ checkArgument(
+ startupOffsetsTimestamp >= 0,
+ "The provided value for the startup offsets timestamp is invalid.");
+
+ long currentTimestamp = System.currentTimeMillis();
+ checkArgument(
+ startupOffsetsTimestamp <= currentTimestamp,
+ "Startup time[%s] must be before current time[%s].",
+ startupOffsetsTimestamp,
+ currentTimestamp);
+
+ this.startupMode = StartupMode.TIMESTAMP;
+ this.startupOffsetsTimestamp = startupOffsetsTimestamp;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading from any committed group offsets found in Zookeeper /
+ * Kafka brokers. The "group.id" property must be set in the configuration properties. If no
+ * offset can be found for a partition, the behaviour in "auto.offset.reset" set in the
+ * configuration properties will be used for the partition.
+ *
+ * <p>This method does not affect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+ * only the offsets in the restored state will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
+ this.startupMode = StartupMode.GROUP_OFFSETS;
+ this.startupOffsetsTimestamp = null;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading partitions from specific offsets, set independently
+ * for each partition. The specified offset should be the offset of the next record that will be
+ * read from partitions. This lets the consumer ignore any committed group offsets in Zookeeper
+ * / Kafka brokers.
+ *
+ * <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not
+ * subscribed by the consumer, the entry will be ignored. If the consumer subscribes to a
+ * partition that does not exist in the provided map of offsets, the consumer will fallback to
+ * the default group offset behaviour (see {@link
+ * FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
+ *
+ * <p>If the specified offset for a partition is invalid, or the behaviour for that partition is
+ * defaulted to group offsets but still no group offset could be found for it, then the
+ * "auto.offset.reset" behaviour set in the configuration properties will be used for the
+ * partition
+ *
+ * <p>This method does not affect where partitions are read from when the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+ * only the offsets in the restored state will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(
+ Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+ this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+ this.startupOffsetsTimestamp = null;
+ this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
+ return this;
+ }
+
+ /**
+ * By default, when restoring from a checkpoint / savepoint, the consumer always ignores
+ * restored partitions that are no longer associated with the current specified topics or topic
+ * pattern to subscribe to.
+ *
+ * <p>This method configures the consumer to not filter the restored partitions, therefore
+ * always attempting to consume whatever partition was present in the previous execution
+ * regardless of the specified topics to subscribe to in the current execution.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics() {
+ this.filterRestoredPartitionsWithCurrentTopicsDescriptor = false;
+ return this;
+ }
+
+ // ------------------------------------------------------------------------
+ // Work methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ // determine the offset commit mode
+ this.offsetCommitMode =
+ OffsetCommitModes.fromConfiguration(
+ getIsAutoCommitEnabled(),
+ enableCommitOnCheckpoints,
+ ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
+
+ // create the partition discoverer
+ this.partitionDiscoverer =
+ createPartitionDiscoverer(
+ topicsDescriptor,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks());
+ this.partitionDiscoverer.open();
+
+ subscribedPartitionsToStartOffsets = new HashMap<>();
+ final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
+ if (restoredState != null) {
+ for (KafkaTopicPartition partition : allPartitions) {
+ if (!restoredState.containsKey(partition)) {
+ restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+ }
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
+ restoredState.entrySet()) {
+ // seed the partition discoverer with the union state while filtering out
+ // restored partitions that should not be subscribed by this subtask
+ if (KafkaTopicPartitionAssigner.assign(
+ restoredStateEntry.getKey(),
+ getRuntimeContext().getNumberOfParallelSubtasks())
+ == getRuntimeContext().getIndexOfThisSubtask()) {
+ subscribedPartitionsToStartOffsets.put(
+ restoredStateEntry.getKey(), restoredStateEntry.getValue());
+ }
+ }
+
+ if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
+ subscribedPartitionsToStartOffsets
+ .entrySet()
+ .removeIf(
+ entry -> {
+ if (!topicsDescriptor.isMatchingTopic(
+ entry.getKey().getTopic())) {
+ LOG.warn(
+ "{} is removed from subscribed partitions since it is no longer "
+ + "associated with topics descriptor of current execution.",
+ entry.getKey());
+ return true;
+ }
+ return false;
+ });
+ }
+
+ LOG.info(
+ "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets);
+ } else {
+ // use the partition discoverer to fetch the initial seed partitions,
+ // and set their initial offsets depending on the startup mode.
+ // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
+ // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
+ // determined
+ // when the partition is actually read.
+ switch (startupMode) {
+ case SPECIFIC_OFFSETS:
+ if (specificStartupOffsets == null) {
+ throw new IllegalStateException(
+ "Startup mode for the consumer set to "
+ + StartupMode.SPECIFIC_OFFSETS
+ + ", but no specific offsets were specified.");
+ }
+
+ for (KafkaTopicPartition seedPartition : allPartitions) {
+ Long specificOffset = specificStartupOffsets.get(seedPartition);
+ if (specificOffset != null) {
+ // since the specified offsets represent the next record to read, we
+ // subtract
+ // it by one so that the initial state of the consumer will be correct
+ subscribedPartitionsToStartOffsets.put(
+ seedPartition, specificOffset - 1);
+ } else {
+ // default to group offset behaviour if the user-provided specific
+ // offsets
+ // do not contain a value for this partition
+ subscribedPartitionsToStartOffsets.put(
+ seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ }
+ }
+
+ break;
+ case TIMESTAMP:
+ if (startupOffsetsTimestamp == null) {
+ throw new IllegalStateException(
+ "Startup mode for the consumer set to "
+ + StartupMode.TIMESTAMP
+ + ", but no startup timestamp was specified.");
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
+ fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
+ .entrySet()) {
+ subscribedPartitionsToStartOffsets.put(
+ partitionToOffset.getKey(),
+ (partitionToOffset.getValue() == null)
+ // if an offset cannot be retrieved for a partition with the
+ // given timestamp,
+ // we default to using the latest offset for the partition
+ ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
+ // since the specified offsets represent the next record to
+ // read, we subtract
+ // it by one so that the initial state of the consumer will
+ // be correct
+ : partitionToOffset.getValue() - 1);
+ }
+
+ break;
+ default:
+ for (KafkaTopicPartition seedPartition : allPartitions) {
+ subscribedPartitionsToStartOffsets.put(
+ seedPartition, startupMode.getStateSentinel());
+ }
+ }
+
+ if (!subscribedPartitionsToStartOffsets.isEmpty()) {
+ switch (startupMode) {
+ case EARLIEST:
+ LOG.info(
+ "Consumer subtask {} will start reading the following {} partitions from the earliest"
+ + " offsets: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets.keySet());
+ break;
+ case LATEST:
+ LOG.info(
+ "Consumer subtask {} will start reading the following {} partitions from the latest "
+ + "offsets: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets.keySet());
+ break;
+ case TIMESTAMP:
+ LOG.info(
+ "Consumer subtask {} will start reading the following {} partitions from timestamp "
+ + "{}: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ startupOffsetsTimestamp,
+ subscribedPartitionsToStartOffsets.keySet());
+ break;
+ case SPECIFIC_OFFSETS:
+ LOG.info(
+ "Consumer subtask {} will start reading the following {} partitions from the "
+ + "specified startup offsets {}: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ specificStartupOffsets,
+ subscribedPartitionsToStartOffsets.keySet());
+
+ List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
+ new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+ for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
+ subscribedPartitionsToStartOffsets.entrySet()) {
+ if (subscribedPartition.getValue()
+ == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+ partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+ }
+ }
+
+ if (partitionsDefaultedToGroupOffsets.size() > 0) {
+ LOG.warn(
+ "Consumer subtask {} cannot find offsets for the following {} partitions in the "
+ + "specified startup offsets: {}"
+ + "; their startup offsets will be defaulted to their committed group "
+ + "offsets in Kafka.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ partitionsDefaultedToGroupOffsets.size(),
+ partitionsDefaultedToGroupOffsets);
+ }
+ break;
+ case GROUP_OFFSETS:
+ LOG.info(
+ "Consumer subtask {} will start reading the following {} partitions from the "
+ + "committed group offsets in Kafka: {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets.size(),
+ subscribedPartitionsToStartOffsets.keySet());
+ }
+ } else {
+ LOG.info(
+ "Consumer subtask {} initially has no partitions to read from.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+
+ this.deserializer.open(
+ RuntimeContextInitializationContextAdapters.deserializationAdapter(
+ getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+
+ if (StringUtils.isNotEmpty(this.inlongMetric)) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ String groupId = inlongMetricArray[0];
+ String streamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ AuditImp auditImp = null;
+ if (inlongAudit != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+ sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(),
+ auditImp);
+ ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
+ ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
+ if (metricState != null) {
+ recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
+ bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
+ }
+ sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
+ sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
+ sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter());
+ sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter());
+ sourceMetricData.registerMetricsForNumBytesInPerSecond();
+ sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+ if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
+ DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
+ (DynamicKafkaDeserializationSchema) deserializer;
+ dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
+ }
+ }
+
+ if (subscribedPartitionsToStartOffsets == null) {
+ throw new Exception("The partitions were not set for the consumer");
+ }
+
+ // initialize commit metrics and default offset callback method
+ this.successfulCommits =
+ this.getRuntimeContext()
+ .getMetricGroup()
+ .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
+ this.failedCommits =
+ this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+ final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
+
+ this.offsetCommitCallback =
+ new KafkaCommitCallback() {
+ @Override
+ public void onSuccess() {
+ successfulCommits.inc();
+ }
+
+ @Override
+ public void onException(Throwable cause) {
+ LOG.warn(
+ String.format(
+ "Consumer subtask %d failed async Kafka commit.",
+ subtaskIndex),
+ cause);
+ failedCommits.inc();
+ }
+ };
+
+ // mark the subtask as temporarily idle if there are no initial seed partitions;
+ // once this subtask discovers some partitions and starts collecting records, the subtask's
+ // status will automatically be triggered back to be active.
+ if (subscribedPartitionsToStartOffsets.isEmpty()) {
+ sourceContext.markAsTemporarilyIdle();
+ }
+
+ LOG.info(
+ "Consumer subtask {} creating fetcher with offsets {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ subscribedPartitionsToStartOffsets);
+ // from this point forward:
+ // - 'snapshotState' will draw offsets from the fetcher,
+ // instead of being built from `subscribedPartitionsToStartOffsets`
+ // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
+ // Kafka through the fetcher, if configured to do so)
+ this.kafkaFetcher =
+ createFetcher(
+ sourceContext,
+ subscribedPartitionsToStartOffsets,
+ watermarkStrategy,
+ (StreamingRuntimeContext) getRuntimeContext(),
+ offsetCommitMode,
+ getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
+ useMetrics);
+
+ if (!running) {
+ return;
+ }
+
+ // depending on whether we were restored with the current state version (1.3),
+ // remaining logic branches off into 2 paths:
+ // 1) New state - partition discovery loop executed as separate thread, with this
+ // thread running the main fetcher loop
+ // 2) Old state - partition discovery is disabled and only the main fetcher loop is
+ // executed
+ if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
+ kafkaFetcher.runFetchLoop();
+ } else {
+ runWithPartitionDiscovery();
+ }
+ }
+
+ private void runWithPartitionDiscovery() throws Exception {
+ final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
+ createAndStartDiscoveryLoop(discoveryLoopErrorRef);
+
+ kafkaFetcher.runFetchLoop();
+
+ // make sure that the partition discoverer is waked up so that
+ // the discoveryLoopThread exits
+ partitionDiscoverer.wakeup();
+ joinDiscoveryLoopThread();
+
+ // rethrow any fetcher errors
+ final Exception discoveryLoopError = discoveryLoopErrorRef.get();
+ if (discoveryLoopError != null) {
+ throw new RuntimeException(discoveryLoopError);
+ }
+ }
+
+ @VisibleForTesting
+ void joinDiscoveryLoopThread() throws InterruptedException {
+ if (discoveryLoopThread != null) {
+ discoveryLoopThread.join();
+ }
+ }
+
+ private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
+ discoveryLoopThread =
+ new Thread(
+ () -> {
+ try {
+ // --------------------- partition discovery loop
+ // ---------------------
+
+ // throughout the loop, we always eagerly check if we are still
+ // running before
+ // performing the next operation, so that we can escape the loop as
+ // soon as possible
+
+ while (running) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Consumer subtask {} is trying to discover new partitions ...",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ final List<KafkaTopicPartition> discoveredPartitions;
+ try {
+ discoveredPartitions =
+ partitionDiscoverer.discoverPartitions();
+ } catch (AbstractPartitionDiscoverer.WakeupException
+ | AbstractPartitionDiscoverer.ClosedException e) {
+ // the partition discoverer may have been closed or woken up
+ // before or during the discovery;
+ // this would only happen if the consumer was canceled;
+ // simply escape the loop
+ break;
+ }
+
+ // no need to add the discovered partitions if we were closed
+ // during the meantime
+ if (running && !discoveredPartitions.isEmpty()) {
+ kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
+ }
+
+ // do not waste any time sleeping if we're not running anymore
+ if (running && discoveryIntervalMillis != 0) {
+ try {
+ Thread.sleep(discoveryIntervalMillis);
+ } catch (InterruptedException iex) {
+ // may be interrupted if the consumer was canceled
+ // midway; simply escape the loop
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ discoveryLoopErrorRef.set(e);
+ } finally {
+ // calling cancel will also let the fetcher loop escape
+ // (if not running, cancel() was already called)
+ if (running) {
+ cancel();
+ }
+ }
+ },
+ "Kafka Partition Discovery for "
+ + getRuntimeContext().getTaskNameWithSubtasks());
+
+ discoveryLoopThread.start();
+ }
+
+ @Override
+ public void cancel() {
+ // set ourselves as not running;
+ // this would let the main discovery loop escape as soon as possible
+ running = false;
+
+ if (discoveryLoopThread != null) {
+
+ if (partitionDiscoverer != null) {
+ // we cannot close the discoverer here, as it is error-prone to concurrent access;
+ // only wakeup the discoverer, the discovery loop will clean itself up after it
+ // escapes
+ partitionDiscoverer.wakeup();
+ }
+
+ // the discovery loop may currently be sleeping in-between
+ // consecutive discoveries; interrupt to shutdown faster
+ discoveryLoopThread.interrupt();
+ }
+
+ // abort the fetcher, if there is one
+ if (kafkaFetcher != null) {
+ kafkaFetcher.cancel();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+
+ joinDiscoveryLoopThread();
+
+ Exception exception = null;
+ if (partitionDiscoverer != null) {
+ try {
+ partitionDiscoverer.close();
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+
+ try {
+ super.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public final void initializeState(FunctionInitializationContext context) throws Exception {
+
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+
+ this.unionOffsetStates =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME,
+ createStateSerializer(getRuntimeContext().getExecutionConfig())));
+
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+
+ if (context.isRestored()) {
+ restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
+ // populate actual holder for restored state
+ for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
+ restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
+ }
+
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
+ LOG.info(
+ "Consumer subtask {} restored state: {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredState);
+ } else {
+ LOG.info(
+ "Consumer subtask {} has no restore state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+
+ @Override
+ public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (!running) {
+ LOG.debug("snapshotState() called on closed source");
+ } else {
+ unionOffsetStates.clear();
+
+ final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
+ // originally restored offsets or the assigned partitions
+ for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
+ subscribedPartitionsToStartOffsets.entrySet()) {
+ unionOffsetStates.add(
+ Tuple2.of(
+ subscribedPartition.getKey(), subscribedPartition.getValue()));
+ }
+
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // the map cannot be asynchronously updated, because only one checkpoint call
+ // can happen
+ // on this function at a time: either snapshotState() or
+ // notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+ }
+ } else {
+ HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // the map cannot be asynchronously updated, because only one checkpoint call
+ // can happen
+ // on this function at a time: either snapshotState() or
+ // notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+ }
+
+ for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
+ currentOffsets.entrySet()) {
+ unionOffsetStates.add(
+ Tuple2.of(
+ kafkaTopicPartitionLongEntry.getKey(),
+ kafkaTopicPartitionLongEntry.getValue()));
+ }
+ }
+
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // truncate the map of pending offsets to commit, to prevent infinite growth
+ while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingOffsetsToCommit.remove(0);
+ }
+ }
+ if (sourceMetricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+ }
+
+ @Override
+ public final void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (!running) {
+ LOG.debug("notifyCheckpointComplete() called on closed source");
+ return;
+ }
+
+ final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
+
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // only one commit operation must be in progress
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ checkpointId);
+ }
+
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn(
+ "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ checkpointId);
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<KafkaTopicPartition, Long> offsets =
+ (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (offsets == null || offsets.size() == 0) {
+ LOG.debug(
+ "Consumer subtask {} has empty checkpoint state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ return;
+ }
+
+ fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
+ } catch (Exception e) {
+ if (running) {
+ throw e;
+ }
+ // else ignore exception if we are no longer running
+ }
+ }
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka Consumer specific methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the data, and
+ * emits it into the data streams.
+ *
+ * @param sourceContext The source context to emit data to.
+ * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should
+ * handle, with their start offsets.
+ * @param watermarkStrategy Optional, a serialized WatermarkStrategy.
+ * @param runtimeContext The task's runtime context.
+ * @return The instantiated fetcher
+ * @throws Exception The method should forward exceptions
+ */
+ protected abstract AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode,
+ MetricGroup kafkaMetricGroup,
+ boolean useMetrics)
+ throws Exception;
+
+ /**
+ * Creates the partition discoverer that is used to find new partitions for this subtask.
+ *
+ * @param topicsDescriptor Descriptor that describes whether we are discovering partitions for
+ * fixed topics or a topic pattern.
+ * @param indexOfThisSubtask The index of this consumer subtask.
+ * @param numParallelSubtasks The total number of parallel consumer subtasks.
+ * @return The instantiated partition discoverer
+ */
+ protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks);
+
+ protected abstract boolean getIsAutoCommitEnabled();
+
+ protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
+ Collection<KafkaTopicPartition> partitions, long timestamp);
+
+ // ------------------------------------------------------------------------
+ // ResultTypeQueryable methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ // ------------------------------------------------------------------------
+ // Test utilities
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
+ return subscribedPartitionsToStartOffsets;
+ }
+
+ @VisibleForTesting
+ TreeMap<KafkaTopicPartition, Long> getRestoredState() {
+ return restoredState;
+ }
+
+ @VisibleForTesting
+ OffsetCommitMode getOffsetCommitMode() {
+ return offsetCommitMode;
+ }
+
+ @VisibleForTesting
+ LinkedMap getPendingOffsetsToCommit() {
+ return pendingOffsetsToCommit;
+ }
+
+ @VisibleForTesting
+ public boolean getEnableCommitOnCheckpoints() {
+ return enableCommitOnCheckpoints;
+ }
+
+ /**
+ * Creates state serializer for kafka topic partition to offset tuple. Using of the explicit
+ * state serializer with KryoSerializer is needed because otherwise users cannot use
+ * 'disableGenericTypes' properties with KafkaConsumer.
+ */
+ @VisibleForTesting
+ static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer(
+ ExecutionConfig executionConfig) {
+ // explicit serializer will keep the compatibility with GenericTypeInformation and allow to
+ // disableGenericTypes for users
+ TypeSerializer<?>[] fieldSerializers =
+ new TypeSerializer<?>[]{
+ new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
+ LongSerializer.INSTANCE
+ };
+ @SuppressWarnings("unchecked")
+ Class<Tuple2<KafkaTopicPartition, Long>> tupleClass =
+ (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
+ return new TupleSerializer<>(tupleClass, fieldSerializers);
+ }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index b2efd2c3e..3f0902c0c 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -57,9 +58,10 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -78,7 +80,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -93,9 +94,13 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
/**
* Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -256,6 +261,10 @@ public class FlinkKafkaProducer<IN>
private SinkMetricData metricData;
private Long dataSize = 0L;
private Long rowSize = 0L;
+
+ private transient ListState<MetricState> metricStateListState;
+
+ private MetricState metricState;
/**
* State for nextTransactionalIdHint.
*/
@@ -910,27 +919,27 @@ public class FlinkKafkaProducer<IN>
inlongGroupId = inlongMetricArray[0];
inlongStreamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup());
+ metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(),
+ auditHostAndPorts);
metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+ metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
metricData.registerMetricsForNumBytesOutPerSecond();
metricData.registerMetricsForNumRecordsOutPerSecond();
}
-
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
+ if (metricState != null && metricData != null) {
+ metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT));
+ metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT));
}
-
super.open(configuration);
}
private void sendOutMetrics(Long rowSize, Long dataSize) {
if (metricData != null) {
- metricData.getNumRecordsOut().inc(rowSize);
- metricData.getNumBytesOut().inc(dataSize);
+ metricData.invoke(rowSize, dataSize);
}
}
@@ -941,23 +950,6 @@ public class FlinkKafkaProducer<IN>
}
}
- private void outputMetricForAudit(ProducerRecord<byte[], byte[]> record) {
- if (auditImp != null) {
- auditImp.add(
- Constants.AUDIT_SORT_OUTPUT,
- inlongGroupId,
- inlongStreamId,
- System.currentTimeMillis(),
- 1,
- record.value().length);
- }
- }
-
- private void resetMetricSize() {
- dataSize = 0L;
- rowSize = 0L;
- }
-
// ------------------- Logic for handling checkpoint flushing -------------------------- //
@Override
@@ -965,7 +957,6 @@ public class FlinkKafkaProducer<IN>
FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
throws FlinkKafkaException {
checkErroneous();
- resetMetricSize();
ProducerRecord<byte[], byte[]> record;
if (keyedSchema != null) {
@@ -1029,10 +1020,7 @@ public class FlinkKafkaProducer<IN>
+ "is a bug.");
}
- rowSize++;
- dataSize = dataSize + record.value().length;
- sendOutMetrics(rowSize, dataSize);
- outputMetricForAudit(record);
+ sendOutMetrics(1L, (long) record.value().length);
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
@@ -1247,6 +1235,10 @@ public class FlinkKafkaProducer<IN>
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
+ if (metricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
}
@Override
@@ -1260,6 +1252,14 @@ public class FlinkKafkaProducer<IN>
semantic = FlinkKafkaProducer.Semantic.NONE;
}
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
+ context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+
nextTransactionalIdHintState =
context.getOperatorStateStore()
.getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
@@ -1313,6 +1313,11 @@ public class FlinkKafkaProducer<IN>
}
}
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
super.initializeState(context);
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 17e92abda..c6b5c11a9 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -28,23 +28,18 @@ import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* deserialization schema for {@link KafkaDynamicSource}.
*/
-class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
@@ -63,18 +58,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
private final boolean upsertMode;
- private final String inlongMetric;
-
private SourceMetricData metricData;
- private String inlongGroupId;
-
- private String auditHostAndPorts;
-
- private String inlongStreamId;
-
- private transient AuditImp auditImp;
-
DynamicKafkaDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -84,9 +69,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
boolean hasMetadata,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
- boolean upsertMode,
- String inlongMetric,
- String auditHostAndPorts) {
+ boolean upsertMode) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -105,9 +88,10 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
upsertMode);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
- this.inlongMetric = inlongMetric;
- this.auditHostAndPorts = auditHostAndPorts;
+ }
+ public void setMetricData(SourceMetricData metricData) {
+ this.metricData = metricData;
}
@Override
@@ -116,21 +100,6 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
keyDeserialization.open(context);
}
valueDeserialization.open(context);
- if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- inlongGroupId = inlongMetricArray[0];
- inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
- metricData.registerMetricsForNumBytesIn();
- metricData.registerMetricsForNumBytesInPerSecond();
- metricData.registerMetricsForNumRecordsIn();
- metricData.registerMetricsForNumRecordsInPerSecond();
- }
- if (auditHostAndPorts != null) {
- AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
- auditImp = AuditImp.getInstance();
- }
}
@Override
@@ -178,26 +147,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
}
private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
- outputMetricForFlink(record);
- outputMetricForAudit(record);
- }
-
- private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
- if (auditImp != null) {
- auditImp.add(
- Constants.AUDIT_SORT_INPUT,
- inlongGroupId,
- inlongStreamId,
- System.currentTimeMillis(),
- 1,
- record.value().length);
- }
- }
-
- private void outputMetricForFlink(ConsumerRecord<byte[], byte[]> record) {
if (metricData != null) {
- metricData.getNumBytesIn().inc(record.value().length);
- metricData.getNumRecordsIn().inc(1);
+ metricData.outputMetrics(1, record.value().length);
}
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index f3580a8f1..af784aad4 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -41,13 +40,12 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
-
+import org.apache.inlong.sort.kafka.FlinkKafkaConsumer;
import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -74,14 +72,21 @@ public class KafkaDynamicSource
// Mutable attributes
// --------------------------------------------------------------------------------------------
- /** Data type that describes the final output of the source. */
+ /**
+ * Data type that describes the final output of the source.
+ */
protected DataType producedDataType;
- /** Metadata that is appended at the end of a physical source row. */
+ /**
+ * Metadata that is appended at the end of a physical source row.
+ */
protected List<String> metadataKeys;
- /** Watermark strategy that is used to generate per-partition watermark. */
- protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+ /**
+ * Watermark strategy that is used to generate per-partition watermark.
+ */
+ protected @Nullable
+ WatermarkStrategy<RowData> watermarkStrategy;
// --------------------------------------------------------------------------------------------
// Format attributes
@@ -89,35 +94,55 @@ public class KafkaDynamicSource
private static final String VALUE_METADATA_PREFIX = "value.";
- /** Data type to configure the formats. */
+ /**
+ * Data type to configure the formats.
+ */
protected final DataType physicalDataType;
- /** Optional format for decoding keys from Kafka. */
- protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+ /**
+ * Optional format for decoding keys from Kafka.
+ */
+ protected final @Nullable
+ DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
- /** Format for decoding values from Kafka. */
+ /**
+ * Format for decoding values from Kafka.
+ */
protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
- /** Indices that determine the key fields and the target position in the produced row. */
+ /**
+ * Indices that determine the key fields and the target position in the produced row.
+ */
protected final int[] keyProjection;
- /** Indices that determine the value fields and the target position in the produced row. */
+ /**
+ * Indices that determine the value fields and the target position in the produced row.
+ */
protected final int[] valueProjection;
- /** Prefix that needs to be removed from fields when constructing the physical data type. */
- protected final @Nullable String keyPrefix;
+ /**
+ * Prefix that needs to be removed from fields when constructing the physical data type.
+ */
+ protected final @Nullable
+ String keyPrefix;
// --------------------------------------------------------------------------------------------
// Kafka-specific attributes
// --------------------------------------------------------------------------------------------
- /** The Kafka topics to consume. */
+ /**
+ * The Kafka topics to consume.
+ */
protected final List<String> topics;
- /** The Kafka topic pattern to consume. */
+ /**
+ * The Kafka topic pattern to consume.
+ */
protected final Pattern topicPattern;
- /** Properties for the Kafka consumer. */
+ /**
+ * Properties for the Kafka consumer.
+ */
protected final Properties properties;
/**
@@ -137,7 +162,9 @@ public class KafkaDynamicSource
*/
protected final long startupTimestampMillis;
- /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
+ /**
+ * Flag to determine source mode. In upsert mode, it will keep the tombstone message. *
+ */
protected final boolean upsertMode;
protected final String inlongMetric;
@@ -214,7 +241,7 @@ public class KafkaDynamicSource
final FlinkKafkaConsumer<RowData> kafkaConsumer =
createKafkaConsumer(keyDeserialization, valueDeserialization,
- producedTypeInfo, inlongMetric, auditHostAndPorts);
+ producedTypeInfo, inlongMetric, auditHostAndPorts);
return SourceFunctionProvider.of(kafkaConsumer, false);
}
@@ -350,8 +377,8 @@ public class KafkaDynamicSource
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo,
- String inlongMetric,
- String auditHostAndPorts) {
+ String inlongMetric,
+ String auditHostAndPorts) {
final MetadataConverter[] metadataConverters =
metadataKeys.stream()
@@ -390,13 +417,15 @@ public class KafkaDynamicSource
hasMetadata,
metadataConverters,
producedTypeInfo,
- upsertMode, inlongMetric, auditHostAndPorts);
+ upsertMode);
final FlinkKafkaConsumer<RowData> kafkaConsumer;
if (topics != null) {
- kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
+ kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties, inlongMetric,
+ auditHostAndPorts);
} else {
- kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
+ kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties, inlongMetric,
+ auditHostAndPorts);
}
switch (startupMode) {
@@ -425,7 +454,8 @@ public class KafkaDynamicSource
return kafkaConsumer;
}
- private @Nullable DeserializationSchema<RowData> createDeserialization(
+ private @Nullable
+ DeserializationSchema<RowData> createDeserialization(
DynamicTableSource.Context context,
@Nullable DecodingFormat<DeserializationSchema<RowData>> format,
int[] projection,
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a7eebdbcd..e7084d2fa 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -429,6 +429,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
sourceMetricData.registerMetricsForNumRecordsIn();
sourceMetricData.registerMetricsForNumBytesIn();
+ sourceMetricData.registerMetricsForNumBytesInForMeter();
+ sourceMetricData.registerMetricsForNumRecordsInForMeter();
sourceMetricData.registerMetricsForNumBytesInPerSecond();
sourceMetricData.registerMetricsForNumRecordsInPerSecond();
}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 980c471b7..5dce25de4 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -570,6 +570,18 @@
Source : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+ 1.3.11 inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+ inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+ Source : org.apache.flink:flink-connector-kafka_2.11:1.13.5 (Please note that the software have been modified.)
+ License : https://github.com/apache/flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: