You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/04 09:19:22 UTC
[inlong] branch master updated: [INLONG-5262][Sort] Add metric report for Pulsar source (#5276)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aa2f8f26e [INLONG-5262][Sort] Add metric report for Pulsar source (#5276)
aa2f8f26e is described below
commit aa2f8f26ed3a8cc3ad736b87e66b58e9417de7cc
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Aug 4 17:19:18 2022 +0800
[INLONG-5262][Sort] Add metric report for Pulsar source (#5276)
---
inlong-sort/sort-connectors/pulsar/pom.xml | 6 +
.../apache/inlong/sort/pulsar/table/Constants.java | 32 ++
.../table/DynamicPulsarDeserializationSchema.java | 352 +++++++++++++++
.../pulsar/table/PulsarDynamicTableFactory.java | 16 +-
.../pulsar/table/PulsarDynamicTableSource.java | 493 +++++++++++++++++++++
.../table/UpsertPulsarDynamicTableFactory.java | 7 +-
.../org.apache.flink.table.factories.Factory | 3 +-
7 files changed, 902 insertions(+), 7 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-connectors/pulsar/pom.xml
index 9c3b24be5..395dcd0a3 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -54,6 +54,11 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -73,6 +78,7 @@
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>io.streamnative.connectors:pulsar-flink-connector-origin*</include>
<include>io.streamnative.connectors:flink-protobuf</include>
<include>org.apache.pulsar:*</include>
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
new file mode 100644
index 000000000..18fd19955
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class Constants {
+
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
new file mode 100644
index 000000000..7f6b1fb95
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
+import org.apache.flink.streaming.util.serialization.FlinkSchema;
+import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.ThreadSafeDeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
+ */
+class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Nullable
+ private final DeserializationSchema<RowData> keyDeserialization;
+
+ private final DeserializationSchema<RowData> valueDeserialization;
+
+ private final boolean hasMetadata;
+
+ private final OutputProjectionCollector outputCollector;
+
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ private final boolean upsertMode;
+
+ private SourceMetricData sourceMetricData;
+
+ private String inlongMetric;
+
+ private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
+ new ThreadLocal<SimpleCollector<RowData>>() {
+ @Override
+ public SimpleCollector initialValue() {
+ return new SimpleCollector();
+ }
+ };
+
+ DynamicPulsarDeserializationSchema(
+ int physicalArity,
+ @Nullable DeserializationSchema<RowData> keyDeserialization,
+ int[] keyProjection,
+ DeserializationSchema<RowData> valueDeserialization,
+ int[] valueProjection,
+ boolean hasMetadata,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean upsertMode,
+ String inlongMetric) {
+ if (upsertMode) {
+ Preconditions.checkArgument(
+ keyDeserialization != null && keyProjection.length > 0,
+ "Key must be set in upsert mode for deserialization schema.");
+ }
+ this.keyDeserialization = ThreadSafeDeserializationSchema.of(keyDeserialization);
+ this.valueDeserialization = ThreadSafeDeserializationSchema.of(valueDeserialization);
+ this.hasMetadata = hasMetadata;
+ this.outputCollector = new OutputProjectionCollector(
+ physicalArity,
+ keyProjection,
+ valueProjection,
+ metadataConverters,
+ upsertMode);
+ this.producedTypeInfo = producedTypeInfo;
+ this.upsertMode = upsertMode;
+ this.inlongMetric = inlongMetric;
+ }
+
+ @Override
+ public void open(DeserializationSchema.InitializationContext context) throws Exception {
+ if (keyDeserialization != null) {
+ keyDeserialization.open(context);
+ }
+ valueDeserialization.open(context);
+
+ if (inlongMetric != null && !inlongMetric.isEmpty()) {
+ sourceMetricData = new SourceMetricData(context.getMetricGroup());
+ String[] inLongMetricArray = inlongMetric.split(DELIMITER);
+ String groupId = inLongMetricArray[0];
+ String streamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ sourceMetricData.registerMetricsForNumBytesIn(groupId,
+ streamId, nodeId, NUM_BYTES_IN);
+ sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId,
+ streamId, nodeId, NUM_BYTES_IN_PER_SECOND);
+ sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId,
+ nodeId, NUM_RECORDS_IN);
+ sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
+ nodeId, NUM_RECORDS_IN_PER_SECOND);
+ }
+
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public RowData deserialize(Message<RowData> message) throws IOException {
+ final SimpleCollector<RowData> collector = tlsCollector.get();
+ deserialize(message, collector);
+ return collector.takeRecord();
+ }
+
+ @Override
+ public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
+ // shortcut in case no output projection is required,
+ // also not for a cartesian product with the keys
+ if (keyDeserialization == null && !hasMetadata) {
+ valueDeserialization.deserialize(message.getData(), collector);
+ if (sourceMetricData != null) {
+ sourceMetricData.getNumRecordsIn().inc(1L);
+ sourceMetricData.getNumBytesIn()
+ .inc(message.getData().length);
+ }
+ return;
+ }
+ BufferingCollector keyCollector = new BufferingCollector();
+
+ // buffer key(s)
+ if (keyDeserialization != null) {
+ keyDeserialization.deserialize(message.getKeyBytes(), keyCollector);
+ }
+
+ // project output while emitting values
+ outputCollector.inputMessage = message;
+ outputCollector.physicalKeyRows = keyCollector.buffer;
+ outputCollector.outputCollector = collector;
+ if ((message.getData() == null || message.getData().length == 0) && upsertMode) {
+ // collect tombstone messages in upsert mode by hand
+ outputCollector.collect(null);
+ } else {
+ valueDeserialization.deserialize(message.getData(), outputCollector);
+ if (sourceMetricData != null) {
+ sourceMetricData.getNumRecordsIn().inc(1L);
+ sourceMetricData.getNumBytesIn()
+ .inc(message.getData().length);
+ }
+ }
+
+ keyCollector.buffer.clear();
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public Schema<RowData> getSchema() {
+ return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), null, valueDeserialization);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ interface MetadataConverter extends Serializable {
+ Object read(Message<?> message);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<RowData> buffer = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ buffer.add(record);
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+ }
+
+ private static class SimpleCollector<T> implements Collector<T> {
+ private T record;
+
+ @Override
+ public void collect(T record) {
+ this.record = record;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private T getRecord() {
+ return record;
+ }
+
+ private T takeRecord() {
+ T result = record;
+ reset();
+ return result;
+ }
+
+ private void reset() {
+ record = null;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Emits a row with key, value, and metadata fields.
+ *
+ * <p>The collector is able to handle the following kinds of keys:
+ * <ul>
+ * <li>No key is used.
+ * <li>A key is used.
+ * <li>The deserialization schema emits multiple keys.
+ * <li>Keys and values have overlapping fields.
+ * <li>Keys are used and value is null.
+ * </ul>
+ */
+ private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int physicalArity;
+
+ private final int[] keyProjection;
+
+ private final int[] valueProjection;
+
+ private final MetadataConverter[] metadataConverters;
+
+ private final boolean upsertMode;
+
+ private transient Message<?> inputMessage;
+
+ private transient List<RowData> physicalKeyRows;
+
+ private transient Collector<RowData> outputCollector;
+
+ OutputProjectionCollector(
+ int physicalArity,
+ int[] keyProjection,
+ int[] valueProjection,
+ MetadataConverter[] metadataConverters,
+ boolean upsertMode) {
+ this.physicalArity = physicalArity;
+ this.keyProjection = keyProjection;
+ this.valueProjection = valueProjection;
+ this.metadataConverters = metadataConverters;
+ this.upsertMode = upsertMode;
+ }
+
+ @Override
+ public void collect(RowData physicalValueRow) {
+ // no key defined
+ if (keyProjection.length == 0) {
+ emitRow(null, (GenericRowData) physicalValueRow);
+ return;
+ }
+
+ // otherwise emit a value for each key
+ for (RowData physicalKeyRow : physicalKeyRows) {
+ emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
+ private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
+ final RowKind rowKind;
+ if (physicalValueRow == null) {
+ if (upsertMode) {
+ rowKind = RowKind.DELETE;
+ } else {
+ throw new DeserializationException(
+ "Invalid null value received in non-upsert mode. "
+ + "Could not to set row kind for output record.");
+ }
+ } else {
+ rowKind = physicalValueRow.getRowKind();
+ }
+
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow = new GenericRowData(
+ rowKind,
+ physicalArity + metadataArity);
+
+ if (physicalValueRow != null) {
+ for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+ producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos));
+ }
+ }
+
+ for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+ assert physicalKeyRow != null;
+ producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
+ }
+
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(physicalArity + metadataPos, metadataConverters[metadataPos].read(inputMessage));
+ }
+
+ outputCollector.collect(producedRow);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 47d90f375..321d3f830 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
@@ -78,6 +77,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
/**
* Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
@@ -272,6 +272,9 @@ public class PulsarDynamicTableFactory implements
String adminUrl = tableOptions.get(ADMIN_URL);
String serviceUrl = tableOptions.get(SERVICE_URL);
+
+ String inlongMetric = tableOptions.get(INLONG_METRIC);
+
return createPulsarTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
@@ -284,7 +287,8 @@ public class PulsarDynamicTableFactory implements
serviceUrl,
adminUrl,
properties,
- startupOptions);
+ startupOptions,
+ inlongMetric);
}
@Override
@@ -322,6 +326,8 @@ public class PulsarDynamicTableFactory implements
options.add(SINK_MESSAGE_ROUTER);
options.add(SINK_PARALLELISM);
options.add(PROPERTIES);
+ options.add(INLONG_METRIC);
+
return options;
}
@@ -351,7 +357,8 @@ public class PulsarDynamicTableFactory implements
String serviceUrl,
String adminUrl,
Properties properties,
- PulsarTableOptions.StartupOptions startupOptions) {
+ PulsarTableOptions.StartupOptions startupOptions,
+ String inLongMetric) {
return new PulsarDynamicTableSource(
physicalDataType,
keyDecodingFormat,
@@ -365,6 +372,7 @@ public class PulsarDynamicTableFactory implements
adminUrl,
properties,
startupOptions,
- false);
+ false,
+ inLongMetric);
}
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
new file mode 100644
index 000000000..6f75555a2
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import static org.apache.flink.table.descriptors.PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
+import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
+import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+/**
+ * pulsar dynamic table source.
+ */
+public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ /** Watermark strategy that is used to generate per-partition watermark. */
+ protected @Nullable
+ WatermarkStrategy<RowData> watermarkStrategy;
+
+ // --------------------------------------------------------------------------------------------
+ // Format attributes
+ // --------------------------------------------------------------------------------------------
+
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for decoding keys from Pulsar. */
+ protected final @Nullable
+ DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+
+ /** Format for decoding values from Pulsar. */
+ protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
+
+ /** Indices that determine the key fields and the target position in the produced row. */
+ protected final int[] keyProjection;
+
+ /** Indices that determine the value fields and the target position in the produced row. */
+ protected final int[] valueProjection;
+
+ /** Prefix that needs to be removed from fields when constructing the physical data type. */
+ @Nullable
+ protected final String keyPrefix;
+ // --------------------------------------------------------------------------------------------
+ // Pulsar-specific attributes
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * The Pulsar topic to consume.
+ */
+ protected final List<String> topics;
+
+ /**
+ * The Pulsar topic to consume.
+ */
+ protected final String topicPattern;
+
+ /**
+ * The Pulsar topic to consume.
+ */
+ protected final String serviceUrl;
+
+ /**
+ * The Pulsar topic to consume.
+ */
+ protected final String adminUrl;
+
+ /**
+ * Properties for the Pulsar consumer.
+ */
+ protected final Properties properties;
+
+ /**
+ * The startup mode for the contained consumer (default is {@link StartupMode#LATEST}).
+ */
+ protected final PulsarTableOptions.StartupOptions startupOptions;
+
+ /**
+ * The default value when startup timestamp is not used.
+ */
+ private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+ /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. **/
+ protected final boolean upsertMode;
+
+ protected String inLongMetric;
+
+ public PulsarDynamicTableSource(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ List<String> topics,
+ String topicPattern,
+ String serviceUrl,
+ String adminUrl,
+ Properties properties,
+ PulsarTableOptions.StartupOptions startupOptions,
+ boolean upsertMode,
+ String inlongMetric) {
+ this.producedDataType = physicalDataType;
+ setTopicInfo(properties, topics, topicPattern);
+
+ // Format attributes
+ this.physicalDataType = Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
+ this.keyDecodingFormat = keyDecodingFormat;
+ this.valueDecodingFormat = Preconditions.checkNotNull(
+ valueDecodingFormat, "Value decoding format must not be null.");
+ this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
+ this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
+ this.keyPrefix = keyPrefix;
+ // Mutable attributes
+ this.producedDataType = physicalDataType;
+ this.metadataKeys = new ArrayList<>();
+ this.watermarkStrategy = null;
+ // Pulsar-specific attributes
+ Preconditions.checkArgument((topics != null && topicPattern == null)
+ || (topics == null && topicPattern != null),
+ "Either Topic or Topic Pattern must be set for source.");
+ this.topics = topics;
+ this.topicPattern = topicPattern;
+ this.adminUrl = adminUrl;
+ this.serviceUrl = serviceUrl;
+ this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+ this.startupOptions = startupOptions;
+ this.upsertMode = upsertMode;
+ this.inLongMetric = inlongMetric;
+
+ }
+
+ private void setTopicInfo(Properties properties, List<String> topics, String topicPattern) {
+ if (StringUtils.isNotBlank(topicPattern)) {
+ properties.putIfAbsent("topicspattern", topicPattern);
+ properties.remove("topic");
+ properties.remove("topics");
+ } else if (topics != null && topics.size() > 1) {
+ properties.putIfAbsent("topics", StringUtils.join(topics, ","));
+ properties.remove("topicspattern");
+ properties.remove("topic");
+ } else if (topics != null && topics.size() == 1) {
+ properties.putIfAbsent("topic", StringUtils.join(topics, ","));
+ properties.remove("topicspattern");
+ properties.remove("topics");
+ } else {
+ throw new RuntimeException("Use `topics` instead of `topic` for multi topic read");
+ }
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return valueDecodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+
+ final DeserializationSchema<RowData> keyDeserialization =
+ createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
+
+ final DeserializationSchema<RowData> valueDeserialization =
+ createDeserialization(context, valueDecodingFormat, valueProjection, "");
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+ PulsarDeserializationSchema<RowData> deserializationSchema = createPulsarDeserialization(keyDeserialization,
+ valueDeserialization,
+ producedTypeInfo);
+ final ClientConfigurationData clientConfigurationData = PulsarClientUtils.newClientConf(serviceUrl, properties);
+ FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+ adminUrl,
+ clientConfigurationData,
+ deserializationSchema,
+ properties
+ );
+
+ if (watermarkStrategy != null) {
+ source.assignTimestampsAndWatermarks(watermarkStrategy);
+ }
+
+ switch (startupOptions.startupMode) {
+ case EARLIEST:
+ source.setStartFromEarliest();
+ break;
+ case LATEST:
+ source.setStartFromLatest();
+ break;
+ case SPECIFIC_OFFSETS:
+ source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
+ break;
+ case EXTERNAL_SUBSCRIPTION:
+ MessageId subscriptionPosition = MessageId.latest;
+ if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
+ subscriptionPosition = MessageId.earliest;
+ }
+ source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+ }
+ return SourceFunctionProvider.of(source, false);
+ }
+
+ private PulsarDeserializationSchema<RowData> createPulsarDeserialization(
+ DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+ final DynamicPulsarDeserializationSchema.MetadataConverter[] metadataConverters = metadataKeys.stream()
+ .map(k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(DynamicPulsarDeserializationSchema.MetadataConverter[]::new);
+
+ // check if connector metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ // adjust physical arity with value format's metadata
+ final int adjustedPhysicalArity = producedDataType.getChildren().size() - metadataKeys.size();
+
+ // adjust value format projection to include value format's metadata columns at the end
+ final int[] adjustedValueProjection = IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(keyProjection.length + valueProjection.length, adjustedPhysicalArity))
+ .toArray();
+
+ return new DynamicPulsarDeserializationSchema(
+ adjustedPhysicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ adjustedValueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode,
+ inLongMetric);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ final PulsarDynamicTableSource copy = new PulsarDynamicTableSource(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ serviceUrl,
+ adminUrl,
+ properties,
+ startupOptions,
+ false, inLongMetric);
+ copy.producedDataType = producedDataType;
+ copy.metadataKeys = metadataKeys;
+ copy.watermarkStrategy = watermarkStrategy;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Pulsar universal table source";
+ }
+
+ private static ClientConfigurationData newClientConf(String serviceUrl) {
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(serviceUrl);
+ return clientConf;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PulsarDynamicTableSource)) {
+ return false;
+ }
+ PulsarDynamicTableSource that = (PulsarDynamicTableSource) o;
+ return upsertMode == that.upsertMode && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(watermarkStrategy, that.watermarkStrategy)
+ && Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+ && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(keyPrefix, that.keyPrefix)
+ && Objects.equals(topics, that.topics)
+ && Objects.equals(topicPattern, that.topicPattern)
+ && Objects.equals(serviceUrl, that.serviceUrl)
+ && Objects.equals(adminUrl, that.adminUrl)
+ && Objects.equals(new HashMap<>(properties), new HashMap<>(that.properties))
+ && Objects.equals(startupOptions, that.startupOptions);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(producedDataType, metadataKeys, watermarkStrategy, physicalDataType, keyDecodingFormat,
+ valueDecodingFormat, keyPrefix, topics, topicPattern, serviceUrl, adminUrl, properties,
+ startupOptions,
+ upsertMode);
+ result = 31 * result + Arrays.hashCode(keyProjection);
+ result = 31 * result + Arrays.hashCode(valueProjection);
+ return result;
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+
+ // add value format metadata with prefix
+ valueDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys = metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys = formatMetadataKeys.stream()
+ .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+ valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ private @Nullable
+ DeserializationSchema<RowData> createDeserialization(
+ Context context,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType = DataTypeUtils.projectRow(this.physicalDataType, projection);
+ if (prefix != null) {
+ physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ message -> StringData.fromString(message.getTopicName())
+ ),
+
+ MESSAGE_ID(
+ "messageId",
+ DataTypes.BYTES().notNull(),
+ message -> message.getMessageId().toByteArray()),
+
+ SEQUENCE_ID(
+ "sequenceId",
+ DataTypes.BIGINT().notNull(),
+ Message::getSequenceId),
+
+ PUBLISH_TIME(
+ "publishTime",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ message -> TimestampData.fromEpochMillis(message.getPublishTime())),
+
+ EVENT_TIME(
+ "eventTime",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ message -> TimestampData.fromEpochMillis(message.getEventTime())),
+
+ PROPERTIES(
+ "properties",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).notNull(),
+ message -> {
+ final Map<StringData, StringData> map = new HashMap<>();
+ for (Map.Entry<String, String> e: message.getProperties().entrySet()) {
+ map.put(StringData.fromString(e.getKey()), StringData.fromString(e.getValue()));
+ }
+ return new GenericMapData(map);
+ }
+ );
+
+ final String key;
+
+ final DataType dataType;
+
+ final DynamicPulsarDeserializationSchema.MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType,
+ DynamicPulsarDeserializationSchema.MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index b8a41360e..bf539d703 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
-import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl;
@@ -68,6 +67,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
/**
* Upsert-Pulsar factory.
@@ -150,6 +150,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
options.add(VALUE_FIELDS_INCLUDE);
options.add(FactoryUtil.SINK_PARALLELISM);
options.add(PROPERTIES);
+ options.add(INLONG_METRIC);
return options;
}
@@ -185,6 +186,8 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
String serverUrl = tableOptions.get(SERVICE_URL);
List<String> topics = tableOptions.get(TOPIC);
String topicPattern = tableOptions.get(TOPIC_PATTERN);
+ String inlongMetric = tableOptions.get(INLONG_METRIC);
+
return new PulsarDynamicTableSource(
schema.toPhysicalRowDataType(),
keyDecodingFormat,
@@ -198,7 +201,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
adminUrl,
properties,
startupOptions,
- true);
+ true, inlongMetric);
}
@Override
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 8def7c890..ece6f3022 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
-org.apache.inlong.sort.pulsar.table.PulsarDynamicTableFactory
\ No newline at end of file
+org.apache.inlong.sort.pulsar.table.PulsarDynamicTableFactory
+org.apache.inlong.sort.pulsar.table.UpsertPulsarDynamicTableFactory
\ No newline at end of file