You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/09 09:41:16 UTC
[inlong] branch master updated: [INLONG-5158][Sort] Sort add metric for kafka source with flink metrics group and audit sdk (#5254)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73cec9e61 [INLONG-5158][Sort] Sort add metric for kafka source with flink metrics group and audit sdk (#5254)
73cec9e61 is described below
commit 73cec9e61111a70fefe575d8cb56448f841fc6a1
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Aug 9 17:41:11 2022 +0800
[INLONG-5158][Sort] Sort add metric for kafka source with flink metrics group and audit sdk (#5254)
---
.../org/apache/inlong/sort/protocol/GroupInfo.java | 14 +
.../apache/inlong/sort/protocol/InlongMetric.java | 5 +
.../org/apache/inlong/sort/base/Constants.java | 23 +
inlong-sort/sort-connectors/kafka/pom.xml | 12 +
.../table/DynamicKafkaDeserializationSchema.java | 346 +++++++++++++
.../sort/kafka/table/KafkaDynamicSource.java | 551 +++++++++++++++++++++
.../sort/kafka/table/KafkaDynamicTableFactory.java | 21 +-
.../inlong/sort/parser/impl/FlinkSqlParser.java | 4 +
8 files changed, 972 insertions(+), 4 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
index 0ec0c728d..516bea0b4 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.protocol;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
import lombok.Data;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -39,6 +41,8 @@ public class GroupInfo implements Serializable {
@JsonProperty("streams")
private List<StreamInfo> streams;
+ private Map<String, String> properties;
+
/**
* Information of group.
*
@@ -50,6 +54,16 @@ public class GroupInfo implements Serializable {
@JsonProperty("streams") List<StreamInfo> streams) {
this.groupId = Preconditions.checkNotNull(groupId, "groupId is null");
this.streams = Preconditions.checkNotNull(streams, "streams is null");
+ this.properties = new HashMap<>();
+ Preconditions.checkState(!streams.isEmpty(), "streams is empty");
+ }
+
+ public GroupInfo(@JsonProperty("groupId") String groupId,
+ @JsonProperty("streams") List<StreamInfo> streams,
+ Map<String, String> properties) {
+ this.groupId = Preconditions.checkNotNull(groupId, "groupId is null");
+ this.streams = Preconditions.checkNotNull(streams, "streams is null");
+ this.properties = properties;
Preconditions.checkState(!streams.isEmpty(), "streams is empty");
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
index 5080f7523..4afb46a89 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
@@ -39,4 +39,9 @@ public interface InlongMetric {
*/
String METRIC_VALUE_FORMAT = "%s&%s&%s";
+ /**
+ * The key of InLong audit, the value should be ip:port&ip:port
+ */
+ String AUDIT_KEY = "inlong.audit";
+
}
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
index bda4e7475..e5a755aa6 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -18,6 +18,9 @@
package org.apache.inlong.sort.base;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
/**
* connector base option constant
*/
@@ -51,4 +54,24 @@ public final class Constants {
*/
public static final String DELIMITER = "&";
+ // sort received successfully
+ public static final Integer AUDIT_SORT_INPUT = 7;
+
+ // sort send successfully
+ public static final Integer AUDIT_SORT_OUTPUT = 8;
+
+
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+
+ public static final ConfigOption<String> INLONG_AUDIT =
+ ConfigOptions.key("inlong.audit")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG AUDIT HOST + '&' + PORT");
+
}
diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml
index dd3a71c7a..3b31a8c96 100644
--- a/inlong-sort/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-connectors/kafka/pom.xml
@@ -37,6 +37,17 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${flink.scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
@@ -54,6 +65,7 @@
<configuration>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>org.apache.kafka:*</include>
<include>org.apache.flink:flink-connector-kafka_${scala.binary.version}</include>
</includes>
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
new file mode 100644
index 000000000..f09b16b5b
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * deserialization schema for {@link KafkaDynamicSource}.
+ */
+class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final @Nullable DeserializationSchema<RowData> keyDeserialization;
+
+ private final DeserializationSchema<RowData> valueDeserialization;
+
+ private final boolean hasMetadata;
+
+ private final BufferingCollector keyCollector;
+
+ private final OutputProjectionCollector outputCollector;
+
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ private final boolean upsertMode;
+
+ private final String inlongMetric;
+
+ private SourceMetricData metricData;
+
+ private String inLongGroupId;
+
+ private String auditHostAndPorts;
+
+ private String inLongStreamId;
+
+ private transient AuditImp auditImp;
+
+ DynamicKafkaDeserializationSchema(
+ int physicalArity,
+ @Nullable DeserializationSchema<RowData> keyDeserialization,
+ int[] keyProjection,
+ DeserializationSchema<RowData> valueDeserialization,
+ int[] valueProjection,
+ boolean hasMetadata,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean upsertMode,
+ String inLongMetric,
+ String auditHostAndPorts) {
+ if (upsertMode) {
+ Preconditions.checkArgument(
+ keyDeserialization != null && keyProjection.length > 0,
+ "Key must be set in upsert mode for deserialization schema.");
+ }
+ this.keyDeserialization = keyDeserialization;
+ this.valueDeserialization = valueDeserialization;
+ this.hasMetadata = hasMetadata;
+ this.keyCollector = new BufferingCollector();
+ this.outputCollector =
+ new OutputProjectionCollector(
+ physicalArity,
+ keyProjection,
+ valueProjection,
+ metadataConverters,
+ upsertMode);
+ this.producedTypeInfo = producedTypeInfo;
+ this.upsertMode = upsertMode;
+ this.inlongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+
+ }
+
+ @Override
+ public void open(DeserializationSchema.InitializationContext context) throws Exception {
+ if (keyDeserialization != null) {
+ keyDeserialization.open(context);
+ }
+ valueDeserialization.open(context);
+ if (inlongMetric != null && !inlongMetric.isEmpty()) {
+ String[] inLongMetricArray = inlongMetric.split(DELIMITER);
+ inLongGroupId = inLongMetricArray[0];
+ inLongStreamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ metricData = new SourceMetricData(context.getMetricGroup());
+ metricData.registerMetricsForNumBytesIn(inLongGroupId, inLongStreamId, nodeId, "numBytesIn");
+ metricData.registerMetricsForNumBytesInPerSecond(inLongGroupId, inLongStreamId,
+ nodeId, "numBytesInPerSecond");
+ metricData.registerMetricsForNumRecordsIn(inLongGroupId, inLongStreamId,
+ nodeId, "numRecordsIn");
+ metricData.registerMetricsForNumRecordsInPerSecond(inLongGroupId, inLongStreamId,
+ nodeId, "numRecordsInPerSecond");
+ }
+ if (auditHostAndPorts != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
+ throw new IllegalStateException("A collector is required for deserializing.");
+ }
+
+ @Override
+ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector)
+ throws Exception {
+ // shortcut in case no output projection is required,
+ // also not for a cartesian product with the keys
+ if (keyDeserialization == null && !hasMetadata) {
+ valueDeserialization.deserialize(record.value(), collector);
+ // output metrics
+ outputMetrics(record);
+ return;
+ }
+
+ // buffer key(s)
+ if (keyDeserialization != null) {
+ keyDeserialization.deserialize(record.key(), keyCollector);
+ }
+
+ // project output while emitting values
+ outputCollector.inputRecord = record;
+ outputCollector.physicalKeyRows = keyCollector.buffer;
+ outputCollector.outputCollector = collector;
+
+ if (record.value() == null && upsertMode) {
+ // collect tombstone messages in upsert mode by hand
+ outputCollector.collect(null);
+ } else {
+ valueDeserialization.deserialize(record.value(), outputCollector);
+ // output metrics
+ outputMetrics(record);
+ }
+
+ keyCollector.buffer.clear();
+ }
+
+ private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
+ outputMetricForFlink(record);
+ outputMetricForAudit(record);
+ }
+
+ private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
+ if (auditImp != null) {
+ auditImp.add(
+ Constants.AUDIT_SORT_INPUT,
+ inLongGroupId,
+ inLongStreamId,
+ System.currentTimeMillis(),
+ 1,
+ record.value().length);
+ }
+ }
+
+ private void outputMetricForFlink(ConsumerRecord<byte[], byte[]> record) {
+ if (metricData != null) {
+ metricData.getNumBytesIn().inc(record.value().length);
+ metricData.getNumRecordsIn().inc(1);
+ }
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ interface MetadataConverter extends Serializable {
+ Object read(ConsumerRecord<?, ?> record);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<RowData> buffer = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ buffer.add(record);
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Emits a row with key, value, and metadata fields.
+ *
+ * <p>The collector is able to handle the following kinds of keys:
+ *
+ * <ul>
+ * <li>No key is used.
+ * <li>A key is used.
+ * <li>The deserialization schema emits multiple keys.
+ * <li>Keys and values have overlapping fields.
+ * <li>Keys are used and value is null.
+ * </ul>
+ */
+ private static final class OutputProjectionCollector
+ implements Collector<RowData>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int physicalArity;
+
+ private final int[] keyProjection;
+
+ private final int[] valueProjection;
+
+ private final MetadataConverter[] metadataConverters;
+
+ private final boolean upsertMode;
+
+ private transient ConsumerRecord<?, ?> inputRecord;
+
+ private transient List<RowData> physicalKeyRows;
+
+ private transient Collector<RowData> outputCollector;
+
+ OutputProjectionCollector(
+ int physicalArity,
+ int[] keyProjection,
+ int[] valueProjection,
+ MetadataConverter[] metadataConverters,
+ boolean upsertMode) {
+ this.physicalArity = physicalArity;
+ this.keyProjection = keyProjection;
+ this.valueProjection = valueProjection;
+ this.metadataConverters = metadataConverters;
+ this.upsertMode = upsertMode;
+ }
+
+ @Override
+ public void collect(RowData physicalValueRow) {
+ // no key defined
+ if (keyProjection.length == 0) {
+ emitRow(null, (GenericRowData) physicalValueRow);
+ return;
+ }
+
+ // otherwise emit a value for each key
+ for (RowData physicalKeyRow : physicalKeyRows) {
+ emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
+ private void emitRow(
+ @Nullable GenericRowData physicalKeyRow,
+ @Nullable GenericRowData physicalValueRow) {
+ final RowKind rowKind;
+ if (physicalValueRow == null) {
+ if (upsertMode) {
+ rowKind = RowKind.DELETE;
+ } else {
+ throw new DeserializationException(
+ "Invalid null value received "
+ + "in non-upsert mode. Could not to "
+ + "set row kind for output record.");
+ }
+ } else {
+ rowKind = physicalValueRow.getRowKind();
+ }
+
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(rowKind, physicalArity + metadataArity);
+
+ for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+ assert physicalKeyRow != null;
+ producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
+ }
+
+ if (physicalValueRow != null) {
+ for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+ producedRow.setField(
+ valueProjection[valuePos], physicalValueRow.getField(valuePos));
+ }
+ }
+
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos,
+ metadataConverters[metadataPos].read(inputRecord));
+ }
+
+ outputCollector.collect(producedRow);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
new file mode 100644
index 000000000..d49af2d00
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Dynamic Kafka table source.
+ * supports reading metadata from Kafka and metric reporting
+ */
+@Internal
+public class KafkaDynamicSource
+ implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ /** Watermark strategy that is used to generate per-partition watermark. */
+ protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+
+ // --------------------------------------------------------------------------------------------
+ // Format attributes
+ // --------------------------------------------------------------------------------------------
+
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for decoding keys from Kafka. */
+ protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+
+ /** Format for decoding values from Kafka. */
+ protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
+
+ /** Indices that determine the key fields and the target position in the produced row. */
+ protected final int[] keyProjection;
+
+ /** Indices that determine the value fields and the target position in the produced row. */
+ protected final int[] valueProjection;
+
+ /** Prefix that needs to be removed from fields when constructing the physical data type. */
+ protected final @Nullable String keyPrefix;
+
+ // --------------------------------------------------------------------------------------------
+ // Kafka-specific attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** The Kafka topics to consume. */
+ protected final List<String> topics;
+
+ /** The Kafka topic pattern to consume. */
+ protected final Pattern topicPattern;
+
+ /** Properties for the Kafka consumer. */
+ protected final Properties properties;
+
+ /**
+ * The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}).
+ */
+ protected final StartupMode startupMode;
+
+ /**
+ * Specific startup offsets; only relevant when startup mode is {@link
+ * StartupMode#SPECIFIC_OFFSETS}.
+ */
+ protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+ /**
+ * The start timestamp to locate partition offsets; only relevant when startup mode is {@link
+ * StartupMode#TIMESTAMP}.
+ */
+ protected final long startupTimestampMillis;
+
+ /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
+ protected final boolean upsertMode;
+
+ protected final String inLongMetric;
+
+ protected final String auditHostAndPorts;
+
+ public KafkaDynamicSource(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ @Nullable List<String> topics,
+ @Nullable Pattern topicPattern,
+ Properties properties,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets,
+ long startupTimestampMillis,
+ boolean upsertMode,
+ final String inLongMetric,
+ final String auditHostAndPorts) {
+ // Format attributes
+ this.physicalDataType =
+ Preconditions.checkNotNull(
+ physicalDataType, "Physical data type must not be null.");
+ this.keyDecodingFormat = keyDecodingFormat;
+ this.valueDecodingFormat =
+ Preconditions.checkNotNull(
+ valueDecodingFormat, "Value decoding format must not be null.");
+ this.keyProjection =
+ Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
+ this.valueProjection =
+ Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
+ this.keyPrefix = keyPrefix;
+ // Mutable attributes
+ this.producedDataType = physicalDataType;
+ this.metadataKeys = Collections.emptyList();
+ this.watermarkStrategy = null;
+ // Kafka-specific attributes
+ Preconditions.checkArgument(
+ (topics != null && topicPattern == null)
+ || (topics == null && topicPattern != null),
+ "Either Topic or Topic Pattern must be set for source.");
+ this.topics = topics;
+ this.topicPattern = topicPattern;
+ this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+ this.startupMode =
+ Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+ this.specificStartupOffsets =
+ Preconditions.checkNotNull(
+ specificStartupOffsets, "Specific offsets must not be null.");
+ this.startupTimestampMillis = startupTimestampMillis;
+ this.upsertMode = upsertMode;
+ this.inLongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return valueDecodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ final DeserializationSchema<RowData> keyDeserialization =
+ createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
+
+ final DeserializationSchema<RowData> valueDeserialization =
+ createDeserialization(context, valueDecodingFormat, valueProjection, null);
+
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ final FlinkKafkaConsumer<RowData> kafkaConsumer =
+ createKafkaConsumer(keyDeserialization, valueDeserialization,
+ producedTypeInfo, inLongMetric, auditHostAndPorts);
+
+ return SourceFunctionProvider.of(kafkaConsumer, false);
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+
+ // add value format metadata with prefix
+ valueDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+ valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ final KafkaDynamicSource copy =
+ new KafkaDynamicSource(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ upsertMode, inLongMetric, auditHostAndPorts);
+ copy.producedDataType = producedDataType;
+ copy.metadataKeys = metadataKeys;
+ copy.watermarkStrategy = watermarkStrategy;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Kafka table source";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final KafkaDynamicSource that = (KafkaDynamicSource) o;
+ return Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+ && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(keyPrefix, that.keyPrefix)
+ && Objects.equals(topics, that.topics)
+ && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
+ && Objects.equals(properties, that.properties)
+ && startupMode == that.startupMode
+ && Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
+ && startupTimestampMillis == that.startupTimestampMillis
+ && Objects.equals(upsertMode, that.upsertMode)
+ && Objects.equals(watermarkStrategy, that.watermarkStrategy);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ producedDataType,
+ metadataKeys,
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ upsertMode,
+ watermarkStrategy);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo,
+ String inlongMetric,
+ String auditHostAndPorts) {
+
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+
+ // check if connector metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ // adjust physical arity with value format's metadata
+ final int adjustedPhysicalArity =
+ producedDataType.getChildren().size() - metadataKeys.size();
+
+ // adjust value format projection to include value format's metadata columns at the end
+ final int[] adjustedValueProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length + valueProjection.length,
+ adjustedPhysicalArity))
+ .toArray();
+
+ final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+ new DynamicKafkaDeserializationSchema(
+ adjustedPhysicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ adjustedValueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode, inlongMetric, auditHostAndPorts);
+
+ final FlinkKafkaConsumer<RowData> kafkaConsumer;
+ if (topics != null) {
+ kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
+ } else {
+ kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
+ }
+
+ switch (startupMode) {
+ case EARLIEST:
+ kafkaConsumer.setStartFromEarliest();
+ break;
+ case LATEST:
+ kafkaConsumer.setStartFromLatest();
+ break;
+ case GROUP_OFFSETS:
+ kafkaConsumer.setStartFromGroupOffsets();
+ break;
+ case SPECIFIC_OFFSETS:
+ kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
+ break;
+ case TIMESTAMP:
+ kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
+ break;
+ }
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
+
+ if (watermarkStrategy != null) {
+ kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
+ }
+ return kafkaConsumer;
+ }
+
+ private @Nullable DeserializationSchema<RowData> createDeserialization(
+ DynamicTableSource.Context context,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType =
+ DataTypeUtils.projectRow(this.physicalDataType, projection);
+ if (prefix != null) {
+ physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return StringData.fromString(record.topic());
+ }
+ }),
+
+ PARTITION(
+ "partition",
+ DataTypes.INT().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.partition();
+ }
+ }),
+
+ HEADERS(
+ "headers",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
+ .notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ final Map<StringData, byte[]> map = new HashMap<>();
+ for (Header header : record.headers()) {
+ map.put(StringData.fromString(header.key()), header.value());
+ }
+ return new GenericMapData(map);
+ }
+ }),
+
+ LEADER_EPOCH(
+ "leader-epoch",
+ DataTypes.INT().nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.leaderEpoch().orElse(null);
+ }
+ }),
+
+ OFFSET(
+ "offset",
+ DataTypes.BIGINT().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.offset();
+ }
+ }),
+
+ TIMESTAMP(
+ "timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return TimestampData.fromEpochMillis(record.timestamp());
+ }
+ }),
+
+ TIMESTAMP_TYPE(
+ "timestamp-type",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return StringData.fromString(record.timestampType().toString());
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 97ffe1ec4..c0e41f1b0 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
@@ -87,6 +86,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
/**
@@ -207,6 +208,8 @@ public class KafkaDynamicTableFactory
options.add(SINK_SEMANTIC);
options.add(SINK_PARALLELISM);
options.add(KAFKA_IGNORE_ALL_CHANGELOG);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
@@ -251,6 +254,10 @@ public class KafkaDynamicTableFactory
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+ final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+
+ final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+
return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
@@ -263,7 +270,9 @@ public class KafkaDynamicTableFactory
properties,
startupOptions.startupMode,
startupOptions.specificOffsets,
- startupOptions.startupTimestampMillis);
+ startupOptions.startupTimestampMillis,
+ inLongMetric,
+ auditHostAndPorts);
}
@Override
@@ -327,7 +336,9 @@ public class KafkaDynamicTableFactory
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
- long startupTimestampMillis) {
+ long startupTimestampMillis,
+ String inLongMetric,
+ String auditHostAndPorts) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
@@ -341,7 +352,9 @@ public class KafkaDynamicTableFactory
startupMode,
specificStartupOffsets,
startupTimestampMillis,
- false);
+ false,
+ inLongMetric,
+ auditHostAndPorts);
}
protected KafkaDynamicSink createKafkaTableSink(
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index d3003b1ad..9cb307f7e 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -186,6 +186,10 @@ public class FlinkSqlParser implements Parser {
properties.put(InlongMetric.METRIC_KEY,
String.format(InlongMetric.METRIC_VALUE_FORMAT, groupInfo.getGroupId(),
streamInfo.getStreamId(), node.getId()));
+ if (StringUtils.isNotEmpty(groupInfo.getProperties().get(InlongMetric.AUDIT_KEY))) {
+ properties.put(InlongMetric.AUDIT_KEY,
+ groupInfo.getProperties().get(InlongMetric.AUDIT_KEY));
+ }
});
}