You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/07/19 06:47:14 UTC
[inlong] branch master updated: [INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fb4667f1a [INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)
fb4667f1a is described below
commit fb4667f1a27855c9c617e317332aac7349fcecc5
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Tue Jul 19 14:47:09 2022 +0800
[INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)
---
inlong-sort/sort-connectors/hbase/pom.xml | 12 +-
.../sort/hbase/HBase2DynamicTableFactory.java | 140 +++++
.../inlong/sort/hbase/metric/MetricData.java | 112 ++++
.../inlong/sort/hbase/options/InLongOptions.java | 34 ++
.../sort/hbase/sink/HBaseDynamicTableSink.java | 117 ++++
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 325 +++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 25 +-
.../sort/cdc/mysql/source/MySqlSourceBuilder.java | 5 +
.../cdc/mysql/source/config/MySqlSourceConfig.java | 10 +-
.../source/config/MySqlSourceConfigFactory.java | 14 +-
.../mysql/source/config/MySqlSourceOptions.java | 6 +
.../source/metrics/MySqlSourceReaderMetrics.java | 54 ++
.../mysql/source/reader/MySqlRecordEmitter.java | 10 +-
.../mysql/table/MySqlTableInlongSourceFactory.java | 6 +-
.../sort/cdc/mysql/table/MySqlTableSource.java | 21 +-
.../DebeziumSourceFunction.java | 647 +++++++++++++++++++++
.../MetricData.java | 88 +++
.../PostgreSQLSource.java | 189 ++++++
.../cdc/postgres/table/PostgreSQLTableFactory.java | 174 ++++++
.../cdc/postgres/table/PostgreSQLTableSource.java | 250 ++++++++
.../org.apache.flink.table.factories.Factory | 33 ++
licenses/inlong-sort-connectors/LICENSE | 10 +
23 files changed, 2274 insertions(+), 24 deletions(-)
diff --git a/inlong-sort/sort-connectors/hbase/pom.xml b/inlong-sort/sort-connectors/hbase/pom.xml
index b2987f686..eeffccb40 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -46,6 +46,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<dependencyManagement>
@@ -111,13 +116,6 @@
</excludes>
</artifactSet>
<filters>
- <filter>
- <artifact>org.apache.inlong:sort-connector-*</artifact>
- <includes>
- <include>org/apache/inlong/**</include>
- <include>META-INF/services/org.apache.flink.table.factories.Factory</include>
- </includes>
- </filter>
<filter>
<artifact>*:*</artifact>
<excludes>
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
new file mode 100644
index 000000000..e4f23bd35
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_ASYNC;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_TTL;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_MAX_RETRIES;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.NULL_STRING_LITERAL;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.TABLE_NAME;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseLookupOptions;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseWriteOptions;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.validatePrimaryKey;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.inlong.sort.hbase.options.InLongOptions.INLONG_METRIC;
+
+/** HBase connector factory. */
+public class HBase2DynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ private static final String IDENTIFIER = "hbase-2.2-inlong";
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ TableFactoryHelper helper = createTableFactoryHelper(this, context);
+ helper.validateExcept(PROPERTIES_PREFIX);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ Map<String, String> options = context.getCatalogTable().getOptions();
+
+ validatePrimaryKey(tableSchema);
+
+ String tableName = tableOptions.get(TABLE_NAME);
+ Configuration hbaseConf = getHBaseConfiguration(options);
+ HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
+ String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
+ HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
+
+ return new HBaseDynamicTableSource(
+ hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ TableFactoryHelper helper = createTableFactoryHelper(this, context);
+ helper.validateExcept(PROPERTIES_PREFIX);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ Map<String, String> options = context.getCatalogTable().getOptions();
+
+ validatePrimaryKey(tableSchema);
+
+ String tableName = tableOptions.get(TABLE_NAME);
+ Configuration hbaseConf = getHBaseConfiguration(options);
+ HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
+ String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
+ HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
+ String inLongMetric = tableOptions.get(INLONG_METRIC);
+
+ return new HBaseDynamicTableSink(
+ tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inLongMetric);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> set = new HashSet<>();
+ set.add(TABLE_NAME);
+ return set;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> set = new HashSet<>();
+ set.add(ZOOKEEPER_ZNODE_PARENT);
+ set.add(ZOOKEEPER_QUORUM);
+ set.add(NULL_STRING_LITERAL);
+ set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
+ set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ set.add(SINK_BUFFER_FLUSH_INTERVAL);
+ set.add(SINK_PARALLELISM);
+ set.add(LOOKUP_ASYNC);
+ set.add(LOOKUP_CACHE_MAX_ROWS);
+ set.add(LOOKUP_CACHE_TTL);
+ set.add(LOOKUP_MAX_RETRIES);
+ set.add(INLONG_METRIC);
+ return set;
+ }
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
new file mode 100644
index 000000000..7bf05fad2
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A collection class for handling metrics
+ */
+public class MetricData {
+
+ private final MetricGroup metricGroup;
+
+ private Counter numRecordsOut;
+ private Counter numBytesOut;
+ private Counter dirtyRecords;
+ private Counter dirtyBytes;
+ private Meter numRecordsOutPerSecond;
+ private Meter numBytesOutPerSecond;
+ private static Integer TIME_SPAN_IN_SECONDS = 60;
+ private static String STREAM_ID = "streamId";
+ private static String GROUP_ID = "groupId";
+ private static String NODE_ID = "nodeId";
+
+ public MetricData(MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ }
+
+ public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
+ numRecordsOut =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
+ numBytesOut =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
+ String metricName) {
+ numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
+ nodeId)
+ .meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
+ }
+
+ public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
+ String metricName) {
+ numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
+ .addGroup(NODE_ID, nodeId)
+ .meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
+ }
+
+ public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
+ String metricName) {
+ dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
+ String metricName) {
+ dirtyBytes =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+
+ }
+
+ public Counter getNumRecordsOut() {
+ return numRecordsOut;
+ }
+
+ public Counter getNumBytesOut() {
+ return numBytesOut;
+ }
+
+ public Counter getDirtyRecords() {
+ return dirtyRecords;
+ }
+
+ public Counter getDirtyBytes() {
+ return dirtyBytes;
+ }
+
+ public Meter getNumRecordsOutPerSecond() {
+ return numRecordsOutPerSecond;
+ }
+
+ public Meter getNumBytesOutPerSecond() {
+ return numBytesOutPerSecond;
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/options/InLongOptions.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/options/InLongOptions.java
new file mode 100644
index 000000000..e7444cf9e
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/options/InLongOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.options;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** InLong Options for HBase. */
+@Internal
+public class InLongOptions {
+
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
new file mode 100644
index 000000000..76616befa
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.conf.Configuration;
+
+/** HBase table sink implementation. */
+@Internal
+public class HBaseDynamicTableSink implements DynamicTableSink {
+
+ private final String tableName;
+ private final HBaseTableSchema hbaseTableSchema;
+ private final Configuration hbaseConf;
+ private final HBaseWriteOptions writeOptions;
+ private final String nullStringLiteral;
+ private final String inLongMetric;
+
+ public HBaseDynamicTableSink(
+ String tableName,
+ HBaseTableSchema hbaseTableSchema,
+ Configuration hbaseConf,
+ HBaseWriteOptions writeOptions,
+ String nullStringLiteral,
+ String inLongMetric) {
+ this.tableName = tableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.hbaseConf = hbaseConf;
+ this.writeOptions = writeOptions;
+ this.nullStringLiteral = nullStringLiteral;
+ this.inLongMetric = inLongMetric;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ HBaseSinkFunction<RowData> sinkFunction =
+ new HBaseSinkFunction<>(
+ tableName,
+ hbaseConf,
+ new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral),
+ writeOptions.getBufferFlushMaxSizeInBytes(),
+ writeOptions.getBufferFlushMaxRows(),
+ writeOptions.getBufferFlushIntervalMillis(),
+ inLongMetric);
+ return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism());
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ // UPSERT mode
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new HBaseDynamicTableSink(
+ tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral, inLongMetric);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "HBase";
+ }
+
+ // -------------------------------------------------------------------------------------------
+
+ @VisibleForTesting
+ public HBaseTableSchema getHBaseTableSchema() {
+ return this.hbaseTableSchema;
+ }
+
+ @VisibleForTesting
+ public HBaseWriteOptions getWriteOptions() {
+ return writeOptions;
+ }
+
+ @VisibleForTesting
+ public Configuration getConfiguration() {
+ return this.hbaseConf;
+ }
+
+ @VisibleForTesting
+ public String getTableName() {
+ return this.tableName;
+ }
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
new file mode 100644
index 000000000..3309e5544
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.inlong.sort.hbase.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The sink function for HBase.
+ *
+ * <p>This class leverage {@link BufferedMutator} to buffer multiple {@link
+ * org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster. The
+ * buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes}, {@code
+ * bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.
+ */
+@Internal
+public class HBaseSinkFunction<T> extends RichSinkFunction<T>
+ implements CheckpointedFunction, BufferedMutator.ExceptionListener {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class);
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+
+ private final long bufferFlushMaxSizeInBytes;
+ private final long bufferFlushMaxMutations;
+ private final long bufferFlushIntervalMillis;
+ private final HBaseMutationConverter<T> mutationConverter;
+ private final String inLongMetric;
+
+ private transient Connection connection;
+ private transient BufferedMutator mutator;
+
+ private transient ScheduledExecutorService executor;
+ private transient ScheduledFuture scheduledFuture;
+ private transient AtomicLong numPendingRequests;
+ private transient RuntimeContext runtimeContext;
+
+ private transient volatile boolean closed = false;
+
+ private MetricData metricData;
+ private Long dataSize = 0L;
+ private Long rowSize = 0L;
+
+ /**
+ * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
+ * was thrown.
+ *
+ * <p>Errors will be checked and rethrown before processing each input element, and when the
+ * sink is closed.
+ */
+ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+ public HBaseSinkFunction(
+ String hTableName,
+ org.apache.hadoop.conf.Configuration conf,
+ HBaseMutationConverter<T> mutationConverter,
+ long bufferFlushMaxSizeInBytes,
+ long bufferFlushMaxMutations,
+ long bufferFlushIntervalMillis,
+ String inLongMetric) {
+ this.hTableName = hTableName;
+ // Configuration is not serializable
+ this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
+ this.mutationConverter = mutationConverter;
+ this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+ this.bufferFlushMaxMutations = bufferFlushMaxMutations;
+ this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+ this.inLongMetric = inLongMetric;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ LOG.info("start open ...");
+ org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+ try {
+ this.runtimeContext = getRuntimeContext();
+ metricData = new MetricData(runtimeContext.getMetricGroup());
+ if (inLongMetric != null && !inLongMetric.isEmpty()) {
+ String[] inLongMetricArray = inLongMetric.split("_");
+ String groupId = inLongMetricArray[0];
+ String streamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+ metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+ metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+ metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+ metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+ metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+ "numRecordsOutPerSecond");
+ }
+ this.mutationConverter.open();
+ this.numPendingRequests = new AtomicLong(0);
+
+ if (null == connection) {
+ this.connection = ConnectionFactory.createConnection(config);
+ }
+ // create a parameter instance, set the table name and custom listener reference.
+ BufferedMutatorParams params =
+ new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
+ if (bufferFlushMaxSizeInBytes > 0) {
+ params.writeBufferSize(bufferFlushMaxSizeInBytes);
+ }
+ this.mutator = connection.getBufferedMutator(params);
+
+ if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
+ this.executor =
+ Executors.newScheduledThreadPool(
+ 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
+ this.scheduledFuture =
+ this.executor.scheduleWithFixedDelay(
+ () -> {
+ if (closed) {
+ return;
+ }
+ try {
+ flush();
+ if (metricData.getNumRecordsOut() != null) {
+ metricData.getNumRecordsOut().inc(rowSize);
+ }
+ if (metricData.getNumRecordsOut() != null) {
+ metricData.getNumBytesOut()
+ .inc(dataSize);
+ }
+ resetStateAfterFlush();
+ } catch (Exception e) {
+ if (metricData.getDirtyRecords() != null) {
+ metricData.getDirtyRecords().inc(rowSize);
+ }
+ if (metricData.getDirtyBytes() != null) {
+ metricData.getDirtyBytes().inc(dataSize);
+ }
+ resetStateAfterFlush();
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, e);
+ }
+ },
+ bufferFlushIntervalMillis,
+ bufferFlushIntervalMillis,
+ TimeUnit.MILLISECONDS);
+ }
+ } catch (TableNotFoundException tnfe) {
+ LOG.error("The table " + hTableName + " not found ", tnfe);
+ throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
+ } catch (IOException ioe) {
+ LOG.error("Exception while creating connection to HBase.", ioe);
+ throw new RuntimeException("Cannot create connection to HBase.", ioe);
+ }
+ LOG.info("end open.");
+ }
+
+ private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
+ // create default configuration from current runtime env (`hbase-site.xml` in classpath)
+ // first,
+ // and overwrite configuration using serialized configuration from client-side env
+ // (`hbase-site.xml` in classpath).
+ // user params from client-side have the highest priority
+ org.apache.hadoop.conf.Configuration runtimeConfig =
+ HBaseConfigurationUtil.deserializeConfiguration(
+ serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
+
+ // do validation: check key option(s) in final runtime configuration
+ if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+ LOG.error(
+ "Can not connect to HBase without {} configuration",
+ HConstants.ZOOKEEPER_QUORUM);
+ throw new IOException(
+ "Check HBase configuration failed, lost: '"
+ + HConstants.ZOOKEEPER_QUORUM
+ + "'!");
+ }
+
+ return runtimeConfig;
+ }
+
+ private void checkErrorAndRethrow() {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occurred in HBaseSink.", cause);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ checkErrorAndRethrow();
+
+ mutator.mutate(mutationConverter.convertToMutation(value));
+ rowSize++;
+ dataSize = dataSize + value.toString().getBytes(StandardCharsets.UTF_8).length;
+ // flush when the buffer number of mutations greater than the configured max size.
+ if (bufferFlushMaxMutations > 0
+ && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
+ try {
+ flush();
+ if (metricData.getNumRecordsOut() != null) {
+ metricData.getNumRecordsOut().inc(rowSize);
+ }
+ if (metricData.getNumRecordsOut() != null) {
+ metricData.getNumBytesOut()
+ .inc(dataSize);
+ }
+ resetStateAfterFlush();
+ } catch (Exception e) {
+ if (metricData.getDirtyRecords() != null) {
+ metricData.getDirtyRecords().inc(rowSize);
+ }
+ if (metricData.getDirtyBytes() != null) {
+ metricData.getDirtyBytes().inc(dataSize);
+ }
+ resetStateAfterFlush();
+ throw e;
+ }
+ }
+ }
+
+ private void resetStateAfterFlush() {
+ dataSize = 0L;
+ rowSize = 0L;
+ }
+
+ private void flush() throws IOException {
+ // BufferedMutator is thread-safe
+ mutator.flush();
+ numPendingRequests.set(0);
+ checkErrorAndRethrow();
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed = true;
+
+ if (mutator != null) {
+ try {
+ mutator.close();
+ } catch (IOException e) {
+ LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
+ }
+ this.mutator = null;
+ }
+
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.warn("Exception occurs while closing HBase Connection.", e);
+ }
+ this.connection = null;
+ }
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ while (numPendingRequests.get() != 0) {
+ flush();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // nothing to do.
+ }
+
+ @Override
+ public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator)
+ throws RetriesExhaustedWithDetailsException {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, exception);
+ }
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..8f8cea02b
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.hbase.HBase2DynamicTableFactory
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 125a816ab..3bdf92831 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -131,21 +132,31 @@ public class MySqlSource<T>
@Override
public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
throws Exception {
- // create source config for the given subtask (e.g. unique server id)
- MySqlSourceConfig sourceConfig =
- configFactory.createConfig(readerContext.getIndexOfSubtask());
- FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
- new FutureCompletingBlockingQueue<>();
-
final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup");
metricGroupMethod.setAccessible(true);
final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
-
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(metricGroup);
sourceReaderMetrics.registerMetrics();
MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);
+ // create source config for the given subtask (e.g. unique server id)
+ MySqlSourceConfig sourceConfig =
+ configFactory.createConfig(readerContext.getIndexOfSubtask());
+ String inlongMetric = sourceConfig.getInlongMetric();
+ if (StringUtils.isNotEmpty(inlongMetric)) {
+ String[] inlongMetricArray = inlongMetric.split("_");
+ String groupId = inlongMetricArray[0];
+ String streamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ sourceReaderMetrics.registerMetricsForNumBytesIn(groupId, streamId, nodeId, "numBytesIn");
+ sourceReaderMetrics.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn");
+ sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId, "numBytesInPerSecond");
+ sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
+ "numRecordsInPerSecond");
+ }
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
+ new FutureCompletingBlockingQueue<>();
Supplier<MySqlSplitReader> splitReaderSupplier =
() ->
new MySqlSplitReader(
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
index 722262b6a..015dc111e 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
@@ -252,6 +252,11 @@ public class MySqlSourceBuilder<T> {
return this;
}
+ public MySqlSourceBuilder<T> inlongMetric(String inlongMetric) {
+ this.configFactory.inlongMetric(inlongMetric);
+ return this;
+ }
+
/**
* Build the {@link MySqlSource}.
*
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
index 65ed4d740..2104e6f0b 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
@@ -67,6 +67,8 @@ public class MySqlSourceConfig implements Serializable {
private final Configuration dbzConfiguration;
private final MySqlConnectorConfig dbzMySqlConfig;
+ private final String inlongMetric;
+
MySqlSourceConfig(
String hostname,
int port,
@@ -88,7 +90,8 @@ public class MySqlSourceConfig implements Serializable {
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties,
- Properties jdbcProperties) {
+ Properties jdbcProperties,
+ String inlongMetric) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -112,6 +115,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
+ this.inlongMetric = inlongMetric;
}
public String getHostname() {
@@ -210,4 +214,8 @@ public class MySqlSourceConfig implements Serializable {
public Properties getJdbcProperties() {
return jdbcProperties;
}
+
+ public String getInlongMetric() {
+ return inlongMetric;
+ }
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 035269224..2bf6507ac 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -74,6 +74,13 @@ public class MySqlSourceConfigFactory implements Serializable {
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private Properties dbzProperties;
+ private String inlongMetric;
+
+ public MySqlSourceConfigFactory inlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
return this;
@@ -334,6 +341,10 @@ public class MySqlSourceConfigFactory implements Serializable {
jdbcProperties = new Properties();
}
+ if (inlongMetric == null) {
+ inlongMetric = "";
+ }
+
return new MySqlSourceConfig(
hostname,
port,
@@ -355,6 +366,7 @@ public class MySqlSourceConfigFactory implements Serializable {
includeSchemaChanges,
scanNewlyAddedTableEnabled,
props,
- jdbcProperties);
+ jdbcProperties,
+ inlongMetric);
}
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 8a211cf19..f936ac8d1 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -30,6 +30,12 @@ import java.time.Duration;
*/
public class MySqlSourceOptions {
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+
public static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 1551975fd..fc375b1d0 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -18,7 +18,10 @@
package org.apache.inlong.sort.cdc.mysql.source.metrics;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
@@ -48,6 +51,15 @@ public class MySqlSourceReaderMetrics {
*/
private volatile long emitDelay = 0L;
+ private Counter numRecordsIn;
+ private Counter numBytesIn;
+ private Meter numRecordsInPerSecond;
+ private Meter numBytesInPerSecond;
+ private static Integer TIME_SPAN_IN_SECONDS = 60;
+ private static String STREAM_ID = "streamId";
+ private static String GROUP_ID = "groupId";
+ private static String NODE_ID = "nodeId";
+
public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
@@ -58,6 +70,32 @@ public class MySqlSourceReaderMetrics {
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
}
+ public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
+ numRecordsIn =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
+ numBytesIn =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
+ String metricName) {
+ numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
+ nodeId)
+ .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
+ }
+
+ public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId,
+ String metricName) {
+ numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
+ .addGroup(NODE_ID, nodeId)
+ .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
+ }
+
public long getFetchDelay() {
return fetchDelay;
}
@@ -85,4 +123,20 @@ public class MySqlSourceReaderMetrics {
public void recordEmitDelay(long emitDelay) {
this.emitDelay = emitDelay;
}
+
+ public Counter getNumRecordsIn() {
+ return numRecordsIn;
+ }
+
+ public Counter getNumBytesIn() {
+ return numBytesIn;
+ }
+
+ public Meter getNumRecordsInPerSecond() {
+ return numRecordsInPerSecond;
+ }
+
+ public Meter getNumBytesInPerSecond() {
+ return numBytesInPerSecond;
+ }
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 68c561892..3cfceb567 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -39,6 +39,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
@@ -144,6 +145,13 @@ public final class MySqlRecordEmitter<T>
new Collector<T>() {
@Override
public void collect(final T t) {
+ if (sourceReaderMetrics.getNumRecordsIn() != null) {
+ sourceReaderMetrics.getNumRecordsIn().inc(1L);
+ }
+ if (sourceReaderMetrics.getNumBytesIn() != null) {
+ sourceReaderMetrics.getNumBytesIn()
+ .inc(t.toString().getBytes(StandardCharsets.UTF_8).length);
+ }
output.collect(t);
}
@@ -169,7 +177,7 @@ public final class MySqlRecordEmitter<T>
}
private void emitElement(SourceRecord element, SourceOutput<T> output,
- TableChange tableSchema) throws Exception {
+ TableChange tableSchema) throws Exception {
outputCollector.output = output;
debeziumDeserializationSchema.deserialize(element, outputCollector, tableSchema);
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 20876ed98..3ffd679f7 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -46,6 +46,7 @@ import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
+import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INLONG_METRIC;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
@@ -119,6 +120,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
final ReadableConfig config = helper.getOptions();
+ final String inlongMetric = config.get(INLONG_METRIC);
final String hostname = config.get(HOSTNAME);
final String username = config.get(USERNAME);
final String password = config.get(PASSWORD);
@@ -183,7 +185,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
scanNewlyAddedTableEnabled,
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
- migrateAll);
+ migrateAll,
+ inlongMetric);
}
@Override
@@ -225,6 +228,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
options.add(MIGRATE_ALL);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(HEARTBEAT_INTERVAL);
+ options.add(INLONG_METRIC);
return options;
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 8e0d76013..11be54d47 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -81,6 +81,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
private final boolean migrateAll;
+ private final String inlongMetric;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
@@ -121,7 +122,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
boolean appendSource,
StartupOptions startupOptions,
Duration heartbeatInterval,
- boolean migrateAll) {
+ boolean migrateAll,
+ String inlongMetric) {
this(
physicalSchema,
port,
@@ -147,7 +149,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
false,
new Properties(),
heartbeatInterval,
- migrateAll);
+ migrateAll,
+ inlongMetric);
}
/**
@@ -178,7 +181,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
boolean scanNewlyAddedTableEnabled,
Properties jdbcProperties,
Duration heartbeatInterval,
- boolean migrateAll) {
+ boolean migrateAll,
+ String inlongMetric) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -207,6 +211,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.metadataKeys = Collections.emptyList();
this.heartbeatInterval = heartbeatInterval;
this.migrateAll = migrateAll;
+ this.inlongMetric = inlongMetric;
}
@Override
@@ -265,6 +270,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
+ .inlongMetric(inlongMetric)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -350,7 +356,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
scanNewlyAddedTableEnabled,
jdbcProperties,
heartbeatInterval,
- migrateAll);
+ migrateAll,
+ inlongMetric);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -388,7 +395,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
- && Objects.equals(jdbcProperties, that.jdbcProperties);
+ && Objects.equals(jdbcProperties, that.jdbcProperties)
+ && Objects.equals(inlongMetric, that.inlongMetric);
}
@Override
@@ -417,7 +425,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled,
- jdbcProperties);
+ jdbcProperties,
+ inlongMetric);
}
@Override
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
new file mode 100644
index 000000000..b2952d1d3
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>There are two workers during the runtime. One worker periodically pulls records from the
+ * database and pushes the records into the {@link Handover}. The other worker consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink style. The reason why
+ * don't use one workers is because debezium has different behaviours in snapshot phase and
+ * streaming phase.
+ *
+ * <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the
+ * consumer. Because the two threads don't communicate to each other directly, the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the
+ * consumer to check the error. However, the source function just closes the engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ * <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the
+ * logic in the error reporting.
+ *
+ * <p>The source function participates in checkpointing and guarantees that no data is lost during a
+ * failure, and that the computation processes elements "exactly once".
+ *
+ * <p>Note: currently, the source function can't run in multiple parallel instances.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+ implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = -5808108641062931623L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+ /**
+ * State name of the consumer's partition offset states.
+ */
+ public static final String OFFSETS_STATE_NAME = "offset-states";
+
+ /**
+ * State name of the consumer's history records state.
+ */
+ public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+ /**
+ * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+ */
+ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+ /**
+ * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+ * not.
+ */
+ public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+ /**
+ * The configuration value represents legacy implementation.
+ */
+ public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+ // ---------------------------------------------------------------------------------------
+ // Properties
+ // ---------------------------------------------------------------------------------------
+
+ /**
+ * The schema to convert from Debezium's messages into Flink's objects.
+ */
+ private final DebeziumDeserializationSchema<T> deserializer;
+
+ /**
+ * User-supplied properties for Kafka. *
+ */
+ private final Properties properties;
+
+ /**
+ * The specific binlog offset to read from when the first startup.
+ */
+ private final @Nullable
+ DebeziumOffset specificOffset;
+
+ /**
+ * Data for pending but uncommitted offsets.
+ */
+ private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ /**
+ * Flag indicating whether the Debezium Engine is started.
+ */
+ private volatile boolean debeziumStarted = false;
+
+ /**
+ * Validator to validate the connected database satisfies the cdc connector's requirements.
+ */
+ private final Validator validator;
+
+ // ---------------------------------------------------------------------------------------
+ // State
+ // ---------------------------------------------------------------------------------------
+
+ /**
+ * The offsets to restore to, if the consumer restores state from a checkpoint.
+ *
+ * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+ * method.
+ *
+ * <p>Using a String because we are encoding the offset state in JSON bytes.
+ */
+ private transient volatile String restoredOffsetState;
+
+ /**
+ * Accessor for state in the operator state backend.
+ */
+ private transient ListState<byte[]> offsetState;
+
+ /**
+ * State to store the history records, i.e. schema changes.
+ *
+ * @see FlinkDatabaseHistory
+ * @see FlinkDatabaseSchemaHistory
+ */
+ private transient ListState<String> schemaRecordsState;
+
+ // ---------------------------------------------------------------------------------------
+ // Worker
+ // ---------------------------------------------------------------------------------------
+
+ private transient ExecutorService executor;
+ private transient DebeziumEngine<?> engine;
+ /**
+ * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+ * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+ */
+ private transient String engineInstanceName;
+
+ /**
+ * Consume the events from the engine and commit the offset to the engine.
+ */
+ private transient DebeziumChangeConsumer changeConsumer;
+
+ /**
+ * The consumer to fetch records from {@link Handover}.
+ */
+ private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+ /**
+ * Buffer the events from the source and record the errors from the debezium.
+ */
+ private transient Handover handover;
+
+ private String inlongMetric;
+
+ private MetricData metricData;
+
+ // ---------------------------------------------------------------------------------------
+
+ public DebeziumSourceFunction(
+ DebeziumDeserializationSchema<T> deserializer,
+ Properties properties,
+ @Nullable DebeziumOffset specificOffset,
+ Validator validator, String inlongMetric) {
+ this.deserializer = deserializer;
+ this.properties = properties;
+ this.specificOffset = specificOffset;
+ this.validator = validator;
+ this.inlongMetric = inlongMetric;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ validator.validate();
+ super.open(parameters);
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+ this.executor = Executors.newSingleThreadExecutor(threadFactory);
+ this.handover = new Handover();
+ this.changeConsumer = new DebeziumChangeConsumer(handover);
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ this.offsetState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME,
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+ this.schemaRecordsState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+ if (context.isRestored()) {
+ restoreOffsetState();
+ restoreHistoryRecordsState();
+ } else {
+ if (specificOffset != null) {
+ byte[] serializedOffset =
+ DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+ restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+ LOG.info(
+ "Consumer subtask {} starts to read from specified offset {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredOffsetState);
+ } else {
+ LOG.info(
+ "Consumer subtask {} has no restore state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+ }
+
+ private void restoreOffsetState() throws Exception {
+ for (byte[] serializedOffset : offsetState.get()) {
+ if (restoredOffsetState == null) {
+ restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+ } else {
+ throw new RuntimeException(
+ "Debezium Source only support single task, "
+ + "however, this is restored from multiple tasks.");
+ }
+ }
+ LOG.info(
+ "Consumer subtask {} restored offset state: {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredOffsetState);
+ }
+
+ private void restoreHistoryRecordsState() throws Exception {
+ DocumentReader reader = DocumentReader.defaultReader();
+ ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+ int recordsCount = 0;
+ boolean firstEntry = true;
+ for (String record : schemaRecordsState.get()) {
+ if (firstEntry) {
+ // we store the engine instance name in the first element
+ this.engineInstanceName = record;
+ firstEntry = false;
+ } else {
+ // Put the records into the state. The database history should read, reorganize and
+ // register the state.
+ historyRecords.add(new SchemaRecord(reader.read(record)));
+ recordsCount++;
+ }
+ }
+ if (engineInstanceName != null) {
+ registerHistory(engineInstanceName, historyRecords);
+ }
+ LOG.info(
+ "Consumer subtask {} restored history records state: {} with {} records.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ engineInstanceName,
+ recordsCount);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ if (handover.hasError()) {
+ LOG.debug("snapshotState() called on closed source");
+ throw new FlinkRuntimeException(
+ "Call snapshotState() on closed source, checkpoint failed.");
+ } else {
+ snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+ snapshotHistoryRecordsState();
+ }
+ }
+
+ private void snapshotOffsetState(long checkpointId) throws Exception {
+ offsetState.clear();
+
+ final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+ byte[] serializedOffset = null;
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
+ // originally restored offsets
+ if (restoredOffsetState != null) {
+ serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+ }
+ } else {
+ byte[] currentState = fetcher.snapshotCurrentState();
+ if (currentState == null && restoredOffsetState != null) {
+ // the fetcher has been initialized, but has not yet received any data,
+ // which means we need to return the originally restored offsets.
+ serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+ } else {
+ serializedOffset = currentState;
+ }
+ }
+
+ if (serializedOffset != null) {
+ offsetState.add(serializedOffset);
+ // the map cannot be asynchronously updated, because only one checkpoint call
+ // can happen on this function at a time: either snapshotState() or
+ // notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+ // truncate the map of pending offsets to commit, to prevent infinite growth
+ while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingOffsetsToCommit.remove(0);
+ }
+ }
+ }
+
+ private void snapshotHistoryRecordsState() throws Exception {
+ schemaRecordsState.clear();
+
+ if (engineInstanceName != null) {
+ schemaRecordsState.add(engineInstanceName);
+ Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+ DocumentWriter writer = DocumentWriter.defaultWriter();
+ for (SchemaRecord record : records) {
+ schemaRecordsState.add(writer.write(record.toDocument()));
+ }
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+
+ // initialize metrics
+ // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+ final Method getMetricGroupMethod =
+ getRuntimeContext().getClass().getMethod("getMetricGroup");
+ getMetricGroupMethod.setAccessible(true);
+ final MetricGroup metricGroup =
+ (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+ metricGroup.gauge(
+ "currentFetchEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+ metricGroup.gauge(
+ "currentEmitEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+ metricGroup.gauge(
+ "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+ if (StringUtils.isNotEmpty(this.inlongMetric)) {
+ String[] inlongMetricArray = inlongMetric.split("_");
+ String groupId = inlongMetricArray[0];
+ String streamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ metricData = new MetricData(metricGroup);
+ metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn");
+ metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, "numBytesIn");
+ metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId, "numBytesInPerSecond");
+ metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId, "numRecordsInPerSecond");
+ }
+ properties.setProperty("name", "engine");
+ properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
+ if (restoredOffsetState != null) {
+ // restored from state
+ properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
+ }
+ // DO NOT include schema change, e.g. DDL
+ properties.setProperty("include.schema.changes", "false");
+ // disable the offset flush totally
+ properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
+ // disable tombstones
+ properties.setProperty("tombstones.on.delete", "false");
+ if (engineInstanceName == null) {
+ // not restore from recovery
+ engineInstanceName = UUID.randomUUID().toString();
+ }
+ // history instance name to initialize FlinkDatabaseHistory
+ properties.setProperty(
+ FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
+ // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
+ // continue to read binlog
+ // see
+ // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+ // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+ properties.setProperty("database.history", determineDatabase().getCanonicalName());
+
+ // we have to filter out the heartbeat events, otherwise the deserializer will fail
+ String dbzHeartbeatPrefix =
+ properties.getProperty(
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+ this.debeziumChangeFetcher =
+ new DebeziumChangeFetcher<>(
+ sourceContext,
+ new DebeziumDeserializationSchema<T>() {
+ @Override
+ public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+ if (metricData != null) {
+ metricData.getNumRecordsIn().inc(1L);
+ metricData.getNumBytesIn()
+ .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ deserializer.deserialize(record, out);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+ },
+ restoredOffsetState == null, // DB snapshot phase if restore state is null
+ dbzHeartbeatPrefix,
+ handover);
+
+ // create the engine with this configuration ...
+ this.engine =
+ DebeziumEngine.create(Connect.class)
+ .using(properties)
+ .notifying(changeConsumer)
+ .using(OffsetCommitPolicy.always())
+ .using(
+ (success, message, error) -> {
+ if (success) {
+ // Close the handover and prepare to exit.
+ handover.close();
+ } else {
+ handover.reportError(error);
+ }
+ })
+ .build();
+
+ // run the engine asynchronously
+ executor.execute(engine);
+ debeziumStarted = true;
+
+ // start the real debezium consumer
+ debeziumChangeFetcher.runFetchLoop();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ if (!debeziumStarted) {
+ LOG.debug("notifyCheckpointComplete() called when engine is not started.");
+ return;
+ }
+
+ final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
+
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn(
+ "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ checkpointId);
+ return;
+ }
+
+ byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (serializedOffsets == null || serializedOffsets.length == 0) {
+ LOG.debug(
+ "Consumer subtask {} has empty checkpoint state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ return;
+ }
+
+ DebeziumOffset offset =
+ DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+ changeConsumer.commitOffset(offset);
+ } catch (Exception e) {
+ // ignore exception if we are no longer running
+ LOG.warn("Ignore error when committing offset to database.", e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // safely and gracefully stop the engine
+ shutdownEngine();
+ if (debeziumChangeFetcher != null) {
+ debeziumChangeFetcher.close();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+
+ if (executor != null) {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
+
+ super.close();
+ }
+
+ /**
+ * Safely and gracefully stop the Debezium engine.
+ */
+ private void shutdownEngine() {
+ try {
+ if (engine != null) {
+ engine.close();
+ }
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+
+ debeziumStarted = false;
+
+ if (handover != null) {
+ handover.close();
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ @VisibleForTesting
+ public LinkedMap getPendingOffsetsToCommit() {
+ return pendingOffsetsToCommit;
+ }
+
+ @VisibleForTesting
+ public boolean getDebeziumStarted() {
+ return debeziumStarted;
+ }
+
+ private Class<?> determineDatabase() {
+ boolean isCompatibleWithLegacy =
+ FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+ if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) {
+ // specifies the legacy implementation but the state may be incompatible
+ if (isCompatibleWithLegacy) {
+ return FlinkDatabaseHistory.class;
+ } else {
+ throw new IllegalStateException(
+ "The configured option 'debezium.internal.implementation' is 'legacy', but the state of "
+ + "source is incompatible with this implementation, you should remove the the option.");
+ }
+ } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+ // tries the non-legacy first
+ return FlinkDatabaseSchemaHistory.class;
+ } else if (isCompatibleWithLegacy) {
+ // fallback to legacy if possible
+ return FlinkDatabaseHistory.class;
+ } else {
+ // impossible
+ throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
+ }
+ }
+
+ @VisibleForTesting
+ public String getEngineInstanceName() {
+ return engineInstanceName;
+ }
+
+ public MetricData getMetricData() {
+ return metricData;
+ }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
new file mode 100644
index 000000000..07b6f9044
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A collection class for handling metrics
+ */
+public class MetricData {
+
+ private final MetricGroup metricGroup;
+
+ private Counter numRecordsIn;
+ private Counter numBytesIn;
+ private Meter numRecordsInPerSecond;
+ private Meter numBytesInPerSecond;
+ private static Integer TIME_SPAN_IN_SECONDS = 60;
+ private static String STREAM_ID = "streamId";
+ private static String GROUP_ID = "groupId";
+ private static String NODE_ID = "nodeId";
+
+ public MetricData(MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ }
+
+ public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
+ numRecordsIn =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
+ numBytesIn =
+ metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+ .counter(metricName);
+ }
+
+ public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
+ String metricName) {
+ numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
+ nodeId)
+ .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
+ }
+
+ public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId,
+ String metricName) {
+ numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
+ .addGroup(NODE_ID, nodeId)
+ .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
+ }
+
+ public Counter getNumRecordsIn() {
+ return numRecordsIn;
+ }
+
+ public Counter getNumBytesIn() {
+ return numBytesIn;
+ }
+
+ public Meter getNumRecordsInPerSecond() {
+ return numRecordsInPerSecond;
+ }
+
+ public Meter getNumBytesInPerSecond() {
+ return numBytesInPerSecond;
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
new file mode 100644
index 000000000..30d2268d6
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.postgresql.PostgresConnector;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume binlog for
+ * PostgreSQL.
+ */
+public class PostgreSQLSource {
+
+ private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ /** Builder class of {@link PostgreSQLSource}. */
+ public static class Builder<T> {
+
+ private String pluginName = "decoderbufs";
+ private String slotName = "flink";
+ private int port = 5432; // default 5432 port
+ private String hostname;
+ private String database;
+ private String username;
+ private String password;
+ private String[] schemaList;
+ private String[] tableList;
+ private Properties dbzProperties;
+ private DebeziumDeserializationSchema<T> deserializer;
+ private String inlongMetric;
+
+ /**
+ * The name of the Postgres logical decoding plug-in installed on the server. Supported
+ * values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,
+ * wal2json_rds_streaming and pgoutput.
+ */
+ public Builder<T> decodingPluginName(String name) {
+ this.pluginName = name;
+ return this;
+ }
+
+ public Builder<T> hostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ /** Integer port number of the PostgreSQL database server. */
+ public Builder<T> port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /** The name of the PostgreSQL database from which to stream the changes. */
+ public Builder<T> database(String database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match schema names to be monitored; any
+ * schema name not included in the whitelist will be excluded from monitoring. By default
+ * all non-system schemas will be monitored.
+ */
+ public Builder<T> schemaList(String... schemaList) {
+ this.schemaList = schemaList;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match fully-qualified table identifiers for
+ * tables to be monitored; any table not included in the whitelist will be excluded from
+ * monitoring. Each identifier is of the form schemaName.tableName. By default the connector
+ * will monitor every non-system table in each monitored schema.
+ */
+ public Builder<T> tableList(String... tableList) {
+ this.tableList = tableList;
+ return this;
+ }
+
+ /**
+ * Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
+ */
+ public Builder<T> username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /** Password to use when connecting to the PostgreSQL database server. */
+ public Builder<T> password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * The name of the PostgreSQL logical decoding slot that was created for streaming changes
+ * from a particular plug-in for a particular database/schema. The server uses this slot to
+ * stream events to the connector that you are configuring. Default is "flink".
+ *
+ * <p>Slot names must conform to <a
+ * href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL
+ * replication slot naming rules</a>, which state: "Each replication slot has a name, which
+ * can contain lower-case letters, numbers, and the underscore character."
+ */
+ public Builder<T> slotName(String slotName) {
+ this.slotName = slotName;
+ return this;
+ }
+
+ /** The Debezium Postgres connector properties. */
+ public Builder<T> debeziumProperties(Properties properties) {
+ this.dbzProperties = properties;
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public Builder<T> inlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public DebeziumSourceFunction<T> build() {
+ Properties props = new Properties();
+ props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
+ props.setProperty("plugin.name", pluginName);
+ // hard code server name, because we don't need to distinguish it, docs:
+ // Logical name that identifies and provides a namespace for the particular PostgreSQL
+ // database server/cluster being monitored. The logical name should be unique across
+ // all other connectors, since it is used as a prefix for all Kafka topic names coming
+ // from this connector. Only alphanumeric characters and underscores should be used.
+ props.setProperty("database.server.name", "postgres_cdc_source");
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.dbname", checkNotNull(database));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("slot.name", slotName);
+ // we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch
+ // is invoked after job restart
+ props.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));
+
+ if (schemaList != null) {
+ props.setProperty("schema.whitelist", String.join(",", schemaList));
+ }
+ if (tableList != null) {
+ props.setProperty("table.whitelist", String.join(",", tableList));
+ }
+
+ if (dbzProperties != null) {
+ props.putAll(dbzProperties);
+ }
+ return new DebeziumSourceFunction<>(
+ deserializer, props, null, Validator.getDefaultValidator(), inlongMetric);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
new file mode 100644
index 000000000..7826202ac
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+
+/**
+ * Factory for creating configured instance of
+ * {@link com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource}.
+ */
+public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+
+ private static final String IDENTIFIER = "postgres-cdc-inlong";
+
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric")
+ .stringType()
+ .defaultValue("")
+ .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+
+ private static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("IP address or hostname of the PostgreSQL database server.");
+
+ private static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(5432)
+ .withDescription("Integer port number of the PostgreSQL database server.");
+
+ private static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the PostgreSQL database to use when connecting to the PostgreSQL database server"
+ + ".");
+
+ private static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to use when connecting to the PostgreSQL database server.");
+
+ private static final ConfigOption<String> DATABASE_NAME =
+ ConfigOptions.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the PostgreSQL server to monitor.");
+
+ private static final ConfigOption<String> SCHEMA_NAME =
+ ConfigOptions.key("schema-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Schema name of the PostgreSQL database to monitor.");
+
+ private static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the PostgreSQL database to monitor.");
+
+ private static final ConfigOption<String> DECODING_PLUGIN_NAME =
+ ConfigOptions.key("decoding.plugin.name")
+ .stringType()
+ .defaultValue("decoderbufs")
+ .withDescription(
+ "The name of the Postgres logical decoding plug-in installed on the server.\n"
+ + "Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,\n"
+ + "wal2json_rds_streaming and pgoutput.");
+
+ private static final ConfigOption<String> SLOT_NAME =
+ ConfigOptions.key("slot.name")
+ .stringType()
+ .defaultValue("flink")
+ .withDescription(
+ "The name of the PostgreSQL logical decoding slot that was created for streaming changes "
+ + "from a particular plug-in for a particular database/schema. The server uses "
+ + "this slot "
+ + "to stream events to the connector that you are configuring. Default is "
+ + "\"flink\".");
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+
+ final ReadableConfig config = helper.getOptions();
+ String hostname = config.get(HOSTNAME);
+ String username = config.get(USERNAME);
+ String password = config.get(PASSWORD);
+ String databaseName = config.get(DATABASE_NAME);
+ String schemaName = config.get(SCHEMA_NAME);
+ String tableName = config.get(TABLE_NAME);
+ int port = config.get(PORT);
+ String pluginName = config.get(DECODING_PLUGIN_NAME);
+ String slotName = config.get(SLOT_NAME);
+ ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
+ String inlongMetric = config.get(INLONG_METRIC);
+
+ return new PostgreSQLTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ databaseName,
+ schemaName,
+ tableName,
+ username,
+ password,
+ pluginName,
+ slotName,
+ getDebeziumProperties(context.getCatalogTable().getOptions()),
+ inlongMetric);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTNAME);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(DATABASE_NAME);
+ options.add(SCHEMA_NAME);
+ options.add(TABLE_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PORT);
+ options.add(DECODING_PLUGIN_NAME);
+ options.add(SLOT_NAME);
+ options.add(INLONG_METRIC);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
new file mode 100644
index 000000000..89769a509
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
+import com.ververica.cdc.connectors.postgres.table.PostgresValueValidator;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.cdc.postgres.DebeziumSourceFunction;
+import org.apache.inlong.sort.cdc.postgres.PostgreSQLSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical
+ * description.
+ */
+public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final int port;
+ private final String hostname;
+ private final String database;
+ private final String schemaName;
+ private final String tableName;
+ private final String username;
+ private final String password;
+ private final String pluginName;
+ private final String slotName;
+ private final Properties dbzProperties;
+ private final String inlongMetric;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ public PostgreSQLTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String schemaName,
+ String tableName,
+ String username,
+ String password,
+ String pluginName,
+ String slotName,
+ Properties dbzProperties,
+ String inlongMetric) {
+ this.physicalSchema = physicalSchema;
+ this.port = port;
+ this.hostname = checkNotNull(hostname);
+ this.database = checkNotNull(database);
+ this.schemaName = checkNotNull(schemaName);
+ this.tableName = checkNotNull(tableName);
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.pluginName = checkNotNull(pluginName);
+ this.slotName = slotName;
+ this.dbzProperties = dbzProperties;
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.inlongMetric = inlongMetric;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema<RowData> deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(metadataConverters)
+ .setResultTypeInfo(typeInfo)
+ .setValueValidator(new PostgresValueValidator(schemaName, tableName))
+ .build();
+ DebeziumSourceFunction<RowData> sourceFunction =
+ PostgreSQLSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(database)
+ .schemaList(schemaName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .decodingPluginName(pluginName)
+ .slotName(slotName)
+ .debeziumProperties(dbzProperties)
+ .deserializer(deserializer)
+ .inlongMetric(inlongMetric)
+ .build();
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+
+ private MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key ->
+ Stream.of(PostgreSQLReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(PostgreSQLReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ PostgreSQLTableSource source =
+ new PostgreSQLTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ schemaName,
+ tableName,
+ username,
+ password,
+ pluginName,
+ slotName,
+ dbzProperties,
+ inlongMetric);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PostgreSQLTableSource that = (PostgreSQLTableSource) o;
+ return port == that.port
+ && Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hostname, that.hostname)
+ && Objects.equals(database, that.database)
+ && Objects.equals(schemaName, that.schemaName)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(pluginName, that.pluginName)
+ && Objects.equals(slotName, that.slotName)
+ && Objects.equals(dbzProperties, that.dbzProperties)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(inlongMetric, that.inlongMetric);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ schemaName,
+ tableName,
+ username,
+ password,
+ pluginName,
+ slotName,
+ dbzProperties,
+ producedDataType,
+ metadataKeys,
+ inlongMetric);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "PostgreSQL-CDC";
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(PostgreSQLReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ PostgreSQLReadableMetadata::getKey,
+ PostgreSQLReadableMetadata::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/postgres-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..a82c5ed53
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.cdc.postgres.table.PostgreSQLTableFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index b926a91f8..1a7b4a44a 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -432,6 +432,10 @@
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumOffsetSerializer.java
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
+ inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
+ inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+ inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
+ inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
Source : flink-cdc-connectors 2.2.1 (Please note that the software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
@@ -476,6 +480,12 @@
source : flink-connector-elasticsearch 1.13.2 (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+ 1.3.5 inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+ inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+ inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
+ Source : flink-connector-hbase-2.2 1.13.5 (Please note that the software have been modified.)
+ License : https://github.com/apache/flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: