You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/07/19 06:47:14 UTC

[inlong] branch master updated: [INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fb4667f1a [INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)
fb4667f1a is described below

commit fb4667f1a27855c9c617e317332aac7349fcecc5
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Tue Jul 19 14:47:09 2022 +0800

    [INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)
---
 inlong-sort/sort-connectors/hbase/pom.xml          |  12 +-
 .../sort/hbase/HBase2DynamicTableFactory.java      | 140 +++++
 .../inlong/sort/hbase/metric/MetricData.java       | 112 ++++
 .../inlong/sort/hbase/options/InLongOptions.java   |  34 ++
 .../sort/hbase/sink/HBaseDynamicTableSink.java     | 117 ++++
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 325 +++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |  25 +-
 .../sort/cdc/mysql/source/MySqlSourceBuilder.java  |   5 +
 .../cdc/mysql/source/config/MySqlSourceConfig.java |  10 +-
 .../source/config/MySqlSourceConfigFactory.java    |  14 +-
 .../mysql/source/config/MySqlSourceOptions.java    |   6 +
 .../source/metrics/MySqlSourceReaderMetrics.java   |  54 ++
 .../mysql/source/reader/MySqlRecordEmitter.java    |  10 +-
 .../mysql/table/MySqlTableInlongSourceFactory.java |   6 +-
 .../sort/cdc/mysql/table/MySqlTableSource.java     |  21 +-
 .../DebeziumSourceFunction.java                    | 647 +++++++++++++++++++++
 .../MetricData.java                                |  88 +++
 .../PostgreSQLSource.java                          | 189 ++++++
 .../cdc/postgres/table/PostgreSQLTableFactory.java | 174 ++++++
 .../cdc/postgres/table/PostgreSQLTableSource.java  | 250 ++++++++
 .../org.apache.flink.table.factories.Factory       |  33 ++
 licenses/inlong-sort-connectors/LICENSE            |  10 +
 23 files changed, 2274 insertions(+), 24 deletions(-)

diff --git a/inlong-sort/sort-connectors/hbase/pom.xml b/inlong-sort/sort-connectors/hbase/pom.xml
index b2987f686..eeffccb40 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -46,6 +46,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <dependencyManagement>
@@ -111,13 +116,6 @@
                                 </excludes>
                             </artifactSet>
                             <filters>
-                                <filter>
-                                    <artifact>org.apache.inlong:sort-connector-*</artifact>
-                                    <includes>
-                                        <include>org/apache/inlong/**</include>
-                                        <include>META-INF/services/org.apache.flink.table.factories.Factory</include>
-                                    </includes>
-                                </filter>
                                 <filter>
                                     <artifact>*:*</artifact>
                                     <excludes>
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
new file mode 100644
index 000000000..e4f23bd35
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_ASYNC;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_TTL;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_MAX_RETRIES;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.NULL_STRING_LITERAL;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.TABLE_NAME;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseLookupOptions;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseWriteOptions;
+import static org.apache.flink.connector.hbase.options.HBaseOptions.validatePrimaryKey;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.inlong.sort.hbase.options.InLongOptions.INLONG_METRIC;
+
+/** HBase connector factory. */
+public class HBase2DynamicTableFactory
+        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    private static final String IDENTIFIER = "hbase-2.2-inlong";
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        TableFactoryHelper helper = createTableFactoryHelper(this, context);
+        helper.validateExcept(PROPERTIES_PREFIX);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        TableSchema tableSchema = context.getCatalogTable().getSchema();
+        Map<String, String> options = context.getCatalogTable().getOptions();
+
+        validatePrimaryKey(tableSchema);
+
+        String tableName = tableOptions.get(TABLE_NAME);
+        Configuration hbaseConf = getHBaseConfiguration(options);
+        HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
+        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
+        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
+
+        return new HBaseDynamicTableSource(
+                hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        TableFactoryHelper helper = createTableFactoryHelper(this, context);
+        helper.validateExcept(PROPERTIES_PREFIX);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        TableSchema tableSchema = context.getCatalogTable().getSchema();
+        Map<String, String> options = context.getCatalogTable().getOptions();
+
+        validatePrimaryKey(tableSchema);
+
+        String tableName = tableOptions.get(TABLE_NAME);
+        Configuration hbaseConf = getHBaseConfiguration(options);
+        HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
+        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
+        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
+        String inLongMetric = tableOptions.get(INLONG_METRIC);
+
+        return new HBaseDynamicTableSink(
+                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inLongMetric);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> set = new HashSet<>();
+        set.add(TABLE_NAME);
+        return set;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> set = new HashSet<>();
+        set.add(ZOOKEEPER_ZNODE_PARENT);
+        set.add(ZOOKEEPER_QUORUM);
+        set.add(NULL_STRING_LITERAL);
+        set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
+        set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        set.add(SINK_BUFFER_FLUSH_INTERVAL);
+        set.add(SINK_PARALLELISM);
+        set.add(LOOKUP_ASYNC);
+        set.add(LOOKUP_CACHE_MAX_ROWS);
+        set.add(LOOKUP_CACHE_TTL);
+        set.add(LOOKUP_MAX_RETRIES);
+        set.add(INLONG_METRIC);
+        return set;
+    }
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
new file mode 100644
index 000000000..7bf05fad2
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
@@ -0,0 +1,112 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A collection class for handling metrics
+ */
+public class MetricData {
+
+    private final MetricGroup metricGroup;
+
+    private Counter numRecordsOut;
+    private Counter numBytesOut;
+    private Counter dirtyRecords;
+    private Counter dirtyBytes;
+    private Meter numRecordsOutPerSecond;
+    private Meter numBytesOutPerSecond;
+    private static Integer TIME_SPAN_IN_SECONDS = 60;
+    private static String STREAM_ID = "streamId";
+    private static String GROUP_ID = "groupId";
+    private static String NODE_ID = "nodeId";
+
+    public MetricData(MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+    }
+
+    public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
+        numRecordsOut =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+    }
+
+    public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
+        numBytesOut =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+    }
+
+    public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
+            String metricName) {
+        numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
+                        nodeId)
+                .meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
+    }
+
+    public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
+            String metricName) {
+        numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
+                .addGroup(NODE_ID, nodeId)
+                .meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
+    }
+
+    public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
+            String metricName) {
+        dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                .counter(metricName);
+    }
+
+    public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
+            String metricName) {
+        dirtyBytes =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+
+    }
+
+    public Counter getNumRecordsOut() {
+        return numRecordsOut;
+    }
+
+    public Counter getNumBytesOut() {
+        return numBytesOut;
+    }
+
+    public Counter getDirtyRecords() {
+        return dirtyRecords;
+    }
+
+    public Counter getDirtyBytes() {
+        return dirtyBytes;
+    }
+
+    public Meter getNumRecordsOutPerSecond() {
+        return numRecordsOutPerSecond;
+    }
+
+    public Meter getNumBytesOutPerSecond() {
+        return numBytesOutPerSecond;
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/options/InLongOptions.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/options/InLongOptions.java
new file mode 100644
index 000000000..e7444cf9e
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/options/InLongOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.options;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** InLong Options for HBase. */
+@Internal
+public class InLongOptions {
+
+    public static final ConfigOption<String> INLONG_METRIC =
+            ConfigOptions.key("inlong.metric")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
new file mode 100644
index 000000000..76616befa
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.conf.Configuration;
+
+/** HBase table sink implementation. */
+@Internal
+public class HBaseDynamicTableSink implements DynamicTableSink {
+
+    private final String tableName;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final Configuration hbaseConf;
+    private final HBaseWriteOptions writeOptions;
+    private final String nullStringLiteral;
+    private final String inLongMetric;
+
+    public HBaseDynamicTableSink(
+            String tableName,
+            HBaseTableSchema hbaseTableSchema,
+            Configuration hbaseConf,
+            HBaseWriteOptions writeOptions,
+            String nullStringLiteral,
+            String inLongMetric) {
+        this.tableName = tableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.hbaseConf = hbaseConf;
+        this.writeOptions = writeOptions;
+        this.nullStringLiteral = nullStringLiteral;
+        this.inLongMetric = inLongMetric;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        HBaseSinkFunction<RowData> sinkFunction =
+                new HBaseSinkFunction<>(
+                        tableName,
+                        hbaseConf,
+                        new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral),
+                        writeOptions.getBufferFlushMaxSizeInBytes(),
+                        writeOptions.getBufferFlushMaxRows(),
+                        writeOptions.getBufferFlushIntervalMillis(),
+                        inLongMetric);
+        return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism());
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        // UPSERT mode
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new HBaseDynamicTableSink(
+                tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral, inLongMetric);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "HBase";
+    }
+
+    // -------------------------------------------------------------------------------------------
+
+    @VisibleForTesting
+    public HBaseTableSchema getHBaseTableSchema() {
+        return this.hbaseTableSchema;
+    }
+
+    @VisibleForTesting
+    public HBaseWriteOptions getWriteOptions() {
+        return writeOptions;
+    }
+
+    @VisibleForTesting
+    public Configuration getConfiguration() {
+        return this.hbaseConf;
+    }
+
+    @VisibleForTesting
+    public String getTableName() {
+        return this.tableName;
+    }
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
new file mode 100644
index 000000000..3309e5544
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.inlong.sort.hbase.metric.MetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The sink function for HBase.
+ *
+ * <p>This class leverage {@link BufferedMutator} to buffer multiple {@link
+ * org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster. The
+ * buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes}, {@code
+ * bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.
+ */
+@Internal
+public class HBaseSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction, BufferedMutator.ExceptionListener {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class);
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+
+    private final long bufferFlushMaxSizeInBytes;
+    private final long bufferFlushMaxMutations;
+    private final long bufferFlushIntervalMillis;
+    private final HBaseMutationConverter<T> mutationConverter;
+    private final String inLongMetric;
+
+    private transient Connection connection;
+    private transient BufferedMutator mutator;
+
+    private transient ScheduledExecutorService executor;
+    private transient ScheduledFuture scheduledFuture;
+    private transient AtomicLong numPendingRequests;
+    private transient RuntimeContext runtimeContext;
+
+    private transient volatile boolean closed = false;
+
+    private MetricData metricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+
+    /**
+     * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
+     * was thrown.
+     *
+     * <p>Errors will be checked and rethrown before processing each input element, and when the
+     * sink is closed.
+     */
+    private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+    public HBaseSinkFunction(
+            String hTableName,
+            org.apache.hadoop.conf.Configuration conf,
+            HBaseMutationConverter<T> mutationConverter,
+            long bufferFlushMaxSizeInBytes,
+            long bufferFlushMaxMutations,
+            long bufferFlushIntervalMillis,
+            String inLongMetric) {
+        this.hTableName = hTableName;
+        // Configuration is not serializable
+        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
+        this.mutationConverter = mutationConverter;
+        this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+        this.bufferFlushMaxMutations = bufferFlushMaxMutations;
+        this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+        this.inLongMetric = inLongMetric;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        LOG.info("start open ...");
+        org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+        try {
+            this.runtimeContext = getRuntimeContext();
+            metricData = new MetricData(runtimeContext.getMetricGroup());
+            if (inLongMetric != null && !inLongMetric.isEmpty()) {
+                String[] inLongMetricArray = inLongMetric.split("_");
+                String groupId = inLongMetricArray[0];
+                String streamId = inLongMetricArray[1];
+                String nodeId = inLongMetricArray[2];
+                metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+                metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+                metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+                metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+                metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+                metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                        "numRecordsOutPerSecond");
+            }
+            this.mutationConverter.open();
+            this.numPendingRequests = new AtomicLong(0);
+
+            if (null == connection) {
+                this.connection = ConnectionFactory.createConnection(config);
+            }
+            // create a parameter instance, set the table name and custom listener reference.
+            BufferedMutatorParams params =
+                    new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
+            if (bufferFlushMaxSizeInBytes > 0) {
+                params.writeBufferSize(bufferFlushMaxSizeInBytes);
+            }
+            this.mutator = connection.getBufferedMutator(params);
+
+            if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
+                this.executor =
+                        Executors.newScheduledThreadPool(
+                                1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
+                this.scheduledFuture =
+                        this.executor.scheduleWithFixedDelay(
+                                () -> {
+                                    if (closed) {
+                                        return;
+                                    }
+                                    try {
+                                        flush();
+                                        if (metricData.getNumRecordsOut() != null) {
+                                            metricData.getNumRecordsOut().inc(rowSize);
+                                        }
+                                        if (metricData.getNumRecordsOut() != null) {
+                                            metricData.getNumBytesOut()
+                                                    .inc(dataSize);
+                                        }
+                                        resetStateAfterFlush();
+                                    } catch (Exception e) {
+                                        if (metricData.getDirtyRecords() != null) {
+                                            metricData.getDirtyRecords().inc(rowSize);
+                                        }
+                                        if (metricData.getDirtyBytes() != null) {
+                                            metricData.getDirtyBytes().inc(dataSize);
+                                        }
+                                        resetStateAfterFlush();
+                                        // fail the sink and skip the rest of the items
+                                        // if the failure handler decides to throw an exception
+                                        failureThrowable.compareAndSet(null, e);
+                                    }
+                                },
+                                bufferFlushIntervalMillis,
+                                bufferFlushIntervalMillis,
+                                TimeUnit.MILLISECONDS);
+            }
+        } catch (TableNotFoundException tnfe) {
+            LOG.error("The table " + hTableName + " not found ", tnfe);
+            throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
+        } catch (IOException ioe) {
+            LOG.error("Exception while creating connection to HBase.", ioe);
+            throw new RuntimeException("Cannot create connection to HBase.", ioe);
+        }
+        LOG.info("end open.");
+    }
+
+    private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
+        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
+        // first,
+        // and overwrite configuration using serialized configuration from client-side env
+        // (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        org.apache.hadoop.conf.Configuration runtimeConfig =
+                HBaseConfigurationUtil.deserializeConfiguration(
+                        serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+            LOG.error(
+                    "Can not connect to HBase without {} configuration",
+                    HConstants.ZOOKEEPER_QUORUM);
+            throw new IOException(
+                    "Check HBase configuration failed, lost: '"
+                            + HConstants.ZOOKEEPER_QUORUM
+                            + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    private void checkErrorAndRethrow() {
+        Throwable cause = failureThrowable.get();
+        if (cause != null) {
+            throw new RuntimeException("An error occurred in HBaseSink.", cause);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        checkErrorAndRethrow();
+
+        mutator.mutate(mutationConverter.convertToMutation(value));
+        rowSize++;
+        dataSize = dataSize + value.toString().getBytes(StandardCharsets.UTF_8).length;
+        // flush when the buffer number of mutations greater than the configured max size.
+        if (bufferFlushMaxMutations > 0
+                && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
+            try {
+                flush();
+                if (metricData.getNumRecordsOut() != null) {
+                    metricData.getNumRecordsOut().inc(rowSize);
+                }
+                if (metricData.getNumRecordsOut() != null) {
+                    metricData.getNumBytesOut()
+                            .inc(dataSize);
+                }
+                resetStateAfterFlush();
+            } catch (Exception e) {
+                if (metricData.getDirtyRecords() != null) {
+                    metricData.getDirtyRecords().inc(rowSize);
+                }
+                if (metricData.getDirtyBytes() != null) {
+                    metricData.getDirtyBytes().inc(dataSize);
+                }
+                resetStateAfterFlush();
+                throw e;
+            }
+        }
+    }
+
+    private void resetStateAfterFlush() {
+        dataSize = 0L;
+        rowSize = 0L;
+    }
+
+    private void flush() throws IOException {
+        // BufferedMutator is thread-safe
+        mutator.flush();
+        numPendingRequests.set(0);
+        checkErrorAndRethrow();
+    }
+
+    @Override
+    public void close() throws Exception {
+        closed = true;
+
+        if (mutator != null) {
+            try {
+                mutator.close();
+            } catch (IOException e) {
+                LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
+            }
+            this.mutator = null;
+        }
+
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException e) {
+                LOG.warn("Exception occurs while closing HBase Connection.", e);
+            }
+            this.connection = null;
+        }
+
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        while (numPendingRequests.get() != 0) {
+            flush();
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        // nothing to do.
+    }
+
+    @Override
+    public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator)
+            throws RetriesExhaustedWithDetailsException {
+        // fail the sink and skip the rest of the items
+        // if the failure handler decides to throw an exception
+        failureThrowable.compareAndSet(null, exception);
+    }
+}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..8f8cea02b
--- /dev/null
+++ b/inlong-sort/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.hbase.HBase2DynamicTableFactory
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 125a816ab..3bdf92831 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -131,21 +132,31 @@ public class MySqlSource<T>
     @Override
     public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
             throws Exception {
-        // create source config for the given subtask (e.g. unique server id)
-        MySqlSourceConfig sourceConfig =
-                configFactory.createConfig(readerContext.getIndexOfSubtask());
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
-                new FutureCompletingBlockingQueue<>();
-
         final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup");
         metricGroupMethod.setAccessible(true);
         final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
-
         final MySqlSourceReaderMetrics sourceReaderMetrics =
                 new MySqlSourceReaderMetrics(metricGroup);
         sourceReaderMetrics.registerMetrics();
         MySqlSourceReaderContext mySqlSourceReaderContext =
                 new MySqlSourceReaderContext(readerContext);
+        // create source config for the given subtask (e.g. unique server id)
+        MySqlSourceConfig sourceConfig =
+                configFactory.createConfig(readerContext.getIndexOfSubtask());
+        String inlongMetric = sourceConfig.getInlongMetric();
+        if (StringUtils.isNotEmpty(inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split("_");
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            sourceReaderMetrics.registerMetricsForNumBytesIn(groupId, streamId, nodeId, "numBytesIn");
+            sourceReaderMetrics.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn");
+            sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId, "numBytesInPerSecond");
+            sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
+                    "numRecordsInPerSecond");
+        }
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
+                new FutureCompletingBlockingQueue<>();
         Supplier<MySqlSplitReader> splitReaderSupplier =
                 () ->
                         new MySqlSplitReader(
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
index 722262b6a..015dc111e 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
@@ -252,6 +252,11 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    public MySqlSourceBuilder<T> inlongMetric(String inlongMetric) {
+        this.configFactory.inlongMetric(inlongMetric);
+        return this;
+    }
+
     /**
      * Build the {@link MySqlSource}.
      *
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
index 65ed4d740..2104e6f0b 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
@@ -67,6 +67,8 @@ public class MySqlSourceConfig implements Serializable {
     private final Configuration dbzConfiguration;
     private final MySqlConnectorConfig dbzMySqlConfig;
 
+    private final String inlongMetric;
+
     MySqlSourceConfig(
             String hostname,
             int port,
@@ -88,7 +90,8 @@ public class MySqlSourceConfig implements Serializable {
             boolean includeSchemaChanges,
             boolean scanNewlyAddedTableEnabled,
             Properties dbzProperties,
-            Properties jdbcProperties) {
+            Properties jdbcProperties,
+            String inlongMetric) {
         this.hostname = checkNotNull(hostname);
         this.port = port;
         this.username = checkNotNull(username);
@@ -112,6 +115,7 @@ public class MySqlSourceConfig implements Serializable {
         this.dbzConfiguration = Configuration.from(dbzProperties);
         this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
         this.jdbcProperties = jdbcProperties;
+        this.inlongMetric = inlongMetric;
     }
 
     public String getHostname() {
@@ -210,4 +214,8 @@ public class MySqlSourceConfig implements Serializable {
     public Properties getJdbcProperties() {
         return jdbcProperties;
     }
+
+    public String getInlongMetric() {
+        return inlongMetric;
+    }
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 035269224..2bf6507ac 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -74,6 +74,13 @@ public class MySqlSourceConfigFactory implements Serializable {
     private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
     private Properties dbzProperties;
 
+    private String inlongMetric;
+
+    public MySqlSourceConfigFactory inlongMetric(String inlongMetric) {
+        this.inlongMetric = inlongMetric;
+        return this;
+    }
+
     public MySqlSourceConfigFactory hostname(String hostname) {
         this.hostname = hostname;
         return this;
@@ -334,6 +341,10 @@ public class MySqlSourceConfigFactory implements Serializable {
             jdbcProperties = new Properties();
         }
 
+        if (inlongMetric == null) {
+            inlongMetric = "";
+        }
+
         return new MySqlSourceConfig(
                 hostname,
                 port,
@@ -355,6 +366,7 @@ public class MySqlSourceConfigFactory implements Serializable {
                 includeSchemaChanges,
                 scanNewlyAddedTableEnabled,
                 props,
-                jdbcProperties);
+                jdbcProperties,
+                inlongMetric);
     }
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 8a211cf19..f936ac8d1 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -30,6 +30,12 @@ import java.time.Duration;
  */
 public class MySqlSourceOptions {
 
+    public static final ConfigOption<String> INLONG_METRIC =
+            ConfigOptions.key("inlong.metric")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+
     public static final ConfigOption<String> HOSTNAME =
             ConfigOptions.key("hostname")
                     .stringType()
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 1551975fd..fc375b1d0 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -18,7 +18,10 @@
 
 package org.apache.inlong.sort.cdc.mysql.source.metrics;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
 
@@ -48,6 +51,15 @@ public class MySqlSourceReaderMetrics {
      */
     private volatile long emitDelay = 0L;
 
+    private Counter numRecordsIn;
+    private Counter numBytesIn;
+    private Meter numRecordsInPerSecond;
+    private Meter numBytesInPerSecond;
+    private static Integer TIME_SPAN_IN_SECONDS = 60;
+    private static String STREAM_ID = "streamId";
+    private static String GROUP_ID = "groupId";
+    private static String NODE_ID = "nodeId";
+
     public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
     }
@@ -58,6 +70,32 @@ public class MySqlSourceReaderMetrics {
         metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
     }
 
+    public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
+        numRecordsIn =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+    }
+
+    public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
+        numBytesIn =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+    }
+
+    public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
+            String metricName) {
+        numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
+                        nodeId)
+                .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
+    }
+
+    public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId,
+            String metricName) {
+        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
+                .addGroup(NODE_ID, nodeId)
+                .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
+    }
+
     public long getFetchDelay() {
         return fetchDelay;
     }
@@ -85,4 +123,20 @@ public class MySqlSourceReaderMetrics {
     public void recordEmitDelay(long emitDelay) {
         this.emitDelay = emitDelay;
     }
+
+    public Counter getNumRecordsIn() {
+        return numRecordsIn;
+    }
+
+    public Counter getNumBytesIn() {
+        return numBytesIn;
+    }
+
+    public Meter getNumRecordsInPerSecond() {
+        return numRecordsInPerSecond;
+    }
+
+    public Meter getNumBytesInPerSecond() {
+        return numBytesInPerSecond;
+    }
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 68c561892..3cfceb567 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -39,6 +39,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
@@ -144,6 +145,13 @@ public final class MySqlRecordEmitter<T>
                     new Collector<T>() {
                         @Override
                         public void collect(final T t) {
+                            if (sourceReaderMetrics.getNumRecordsIn() != null) {
+                                sourceReaderMetrics.getNumRecordsIn().inc(1L);
+                            }
+                            if (sourceReaderMetrics.getNumBytesIn() != null) {
+                                sourceReaderMetrics.getNumBytesIn()
+                                        .inc(t.toString().getBytes(StandardCharsets.UTF_8).length);
+                            }
                             output.collect(t);
                         }
 
@@ -169,7 +177,7 @@ public final class MySqlRecordEmitter<T>
     }
 
     private void emitElement(SourceRecord element, SourceOutput<T> output,
-                             TableChange tableSchema) throws Exception {
+            TableChange tableSchema) throws Exception {
         outputCollector.output = output;
         debeziumDeserializationSchema.deserialize(element, outputCollector, tableSchema);
     }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 20876ed98..3ffd679f7 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -46,6 +46,7 @@ import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
+import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INLONG_METRIC;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
@@ -119,6 +120,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
                 DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
 
         final ReadableConfig config = helper.getOptions();
+        final String inlongMetric = config.get(INLONG_METRIC);
         final String hostname = config.get(HOSTNAME);
         final String username = config.get(USERNAME);
         final String password = config.get(PASSWORD);
@@ -183,7 +185,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
                 scanNewlyAddedTableEnabled,
                 JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
                 heartbeatInterval,
-                migrateAll);
+                migrateAll,
+                inlongMetric);
     }
 
     @Override
@@ -225,6 +228,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         options.add(MIGRATE_ALL);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         options.add(HEARTBEAT_INTERVAL);
+        options.add(INLONG_METRIC);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 8e0d76013..11be54d47 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -81,6 +81,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
     private final Properties jdbcProperties;
     private final Duration heartbeatInterval;
     private final boolean migrateAll;
+    private final String inlongMetric;
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
     // --------------------------------------------------------------------------------------------
@@ -121,7 +122,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             boolean appendSource,
             StartupOptions startupOptions,
             Duration heartbeatInterval,
-            boolean migrateAll) {
+            boolean migrateAll,
+            String inlongMetric) {
         this(
                 physicalSchema,
                 port,
@@ -147,7 +149,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 false,
                 new Properties(),
                 heartbeatInterval,
-                migrateAll);
+                migrateAll,
+                inlongMetric);
     }
 
     /**
@@ -178,7 +181,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             boolean scanNewlyAddedTableEnabled,
             Properties jdbcProperties,
             Duration heartbeatInterval,
-            boolean migrateAll) {
+            boolean migrateAll,
+            String inlongMetric) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -207,6 +211,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
         this.metadataKeys = Collections.emptyList();
         this.heartbeatInterval = heartbeatInterval;
         this.migrateAll = migrateAll;
+        this.inlongMetric = inlongMetric;
     }
 
     @Override
@@ -265,6 +270,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                             .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
                             .jdbcProperties(jdbcProperties)
                             .heartbeatInterval(heartbeatInterval)
+                            .inlongMetric(inlongMetric)
                             .build();
             return SourceProvider.of(parallelSource);
         } else {
@@ -350,7 +356,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         scanNewlyAddedTableEnabled,
                         jdbcProperties,
                         heartbeatInterval,
-                        migrateAll);
+                        migrateAll,
+                        inlongMetric);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -388,7 +395,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 && Objects.equals(startupOptions, that.startupOptions)
                 && Objects.equals(producedDataType, that.producedDataType)
                 && Objects.equals(metadataKeys, that.metadataKeys)
-                && Objects.equals(jdbcProperties, that.jdbcProperties);
+                && Objects.equals(jdbcProperties, that.jdbcProperties)
+                && Objects.equals(inlongMetric, that.inlongMetric);
     }
 
     @Override
@@ -417,7 +425,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 producedDataType,
                 metadataKeys,
                 scanNewlyAddedTableEnabled,
-                jdbcProperties);
+                jdbcProperties,
+                inlongMetric);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
new file mode 100644
index 000000000..b2952d1d3
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>There are two workers during the runtime. One worker periodically pulls records from the
+ * database and pushes the records into the {@link Handover}. The other worker consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink style. The reason why
+ * don't use one workers is because debezium has different behaviours in snapshot phase and
+ * streaming phase.
+ *
+ * <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the
+ * consumer. Because the two threads don't communicate to each other directly, the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the
+ * consumer to check the error. However, the source function just closes the engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ * <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the
+ * logic in the error reporting.
+ *
+ * <p>The source function participates in checkpointing and guarantees that no data is lost during a
+ * failure, and that the computation processes elements "exactly once".
+ *
+ * <p>Note: currently, the source function can't run in multiple parallel instances.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /**
+     * State name of the consumer's partition offset states.
+     */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /**
+     * State name of the consumer's history records state.
+     */
+    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+    /**
+     * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+     */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+    /**
+     * The configuration value represents legacy implementation.
+     */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // ---------------------------------------------------------------------------------------
+    // Properties
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The schema to convert from Debezium's messages into Flink's objects.
+     */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /**
+     * User-supplied properties for Kafka. *
+     */
+    private final Properties properties;
+
+    /**
+     * The specific binlog offset to read from when the first startup.
+     */
+    private final @Nullable
+    DebeziumOffset specificOffset;
+
+    /**
+     * Data for pending but uncommitted offsets.
+     */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
+
+    /**
+     * Validator to validate the connected database satisfies the cdc connector's requirements.
+     */
+    private final Validator validator;
+
+    // ---------------------------------------------------------------------------------------
+    // State
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // ---------------------------------------------------------------------------------------
+    // Worker
+    // ---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /**
+     * Consume the events from the engine and commit the offset to the engine.
+     */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /**
+     * The consumer to fetch records from {@link Handover}.
+     */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /**
+     * Buffer the events from the source and record the errors from the debezium.
+     */
+    private transient Handover handover;
+
+    private String inlongMetric;
+
+    private MetricData metricData;
+
+    // ---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator, String inlongMetric) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+        this.inlongMetric = inlongMetric;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with {} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received any data,
+                // which means we need to return the originally restored offsets.
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent infinite growth
+            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split("_");
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            metricData = new MetricData(metricGroup);
+            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, "numRecordsIn");
+            metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, "numBytesIn");
+            metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId, "numBytesInPerSecond");
+            metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId, "numRecordsInPerSecond");
+        }
+        properties.setProperty("name", "engine");
+        properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
+        if (restoredOffsetState != null) {
+            // restored from state
+            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
+        }
+        // DO NOT include schema change, e.g. DDL
+        properties.setProperty("include.schema.changes", "false");
+        // disable the offset flush totally
+        properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
+        // disable tombstones
+        properties.setProperty("tombstones.on.delete", "false");
+        if (engineInstanceName == null) {
+            // not restore from recovery
+            engineInstanceName = UUID.randomUUID().toString();
+        }
+        // history instance name to initialize FlinkDatabaseHistory
+        properties.setProperty(
+                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
+        // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
+        // continue to read binlog
+        // see
+        // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+        // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+        properties.setProperty("database.history", determineDatabase().getCanonicalName());
+
+        // we have to filter out the heartbeat events, otherwise the deserializer will fail
+        String dbzHeartbeatPrefix =
+                properties.getProperty(
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+        this.debeziumChangeFetcher =
+                new DebeziumChangeFetcher<>(
+                        sourceContext,
+                        new DebeziumDeserializationSchema<T>() {
+                            @Override
+                            public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+                                if (metricData != null) {
+                                    metricData.getNumRecordsIn().inc(1L);
+                                    metricData.getNumBytesIn()
+                                            .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                }
+                                deserializer.deserialize(record, out);
+                            }
+
+                            @Override
+                            public TypeInformation<T> getProducedType() {
+                                return deserializer.getProducedType();
+                            }
+                        },
+                        restoredOffsetState == null, // DB snapshot phase if restore state is null
+                        dbzHeartbeatPrefix,
+                        handover);
+
+        // create the engine with this configuration ...
+        this.engine =
+                DebeziumEngine.create(Connect.class)
+                        .using(properties)
+                        .notifying(changeConsumer)
+                        .using(OffsetCommitPolicy.always())
+                        .using(
+                                (success, message, error) -> {
+                                    if (success) {
+                                        // Close the handover and prepare to exit.
+                                        handover.close();
+                                    } else {
+                                        handover.reportError(error);
+                                    }
+                                })
+                        .build();
+
+        // run the engine asynchronously
+        executor.execute(engine);
+        debeziumStarted = true;
+
+        // start the real debezium consumer
+        debeziumChangeFetcher.runFetchLoop();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        if (!debeziumStarted) {
+            LOG.debug("notifyCheckpointComplete() called when engine is not started.");
+            return;
+        }
+
+        final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+            return;
+        }
+
+        try {
+            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+            if (posInMap == -1) {
+                LOG.warn(
+                        "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+                return;
+            }
+
+            byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
+
+            // remove older checkpoints in map
+            for (int i = 0; i < posInMap; i++) {
+                pendingOffsetsToCommit.remove(0);
+            }
+
+            if (serializedOffsets == null || serializedOffsets.length == 0) {
+                LOG.debug(
+                        "Consumer subtask {} has empty checkpoint state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+                return;
+            }
+
+            DebeziumOffset offset =
+                    DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+            changeConsumer.commitOffset(offset);
+        } catch (Exception e) {
+            // ignore exception if we are no longer running
+            LOG.warn("Ignore error when committing offset to database.", e);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        // safely and gracefully stop the engine
+        shutdownEngine();
+        if (debeziumChangeFetcher != null) {
+            debeziumChangeFetcher.close();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        if (executor != null) {
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        }
+
+        super.close();
+    }
+
+    /**
+     * Safely and gracefully stop the Debezium engine.
+     */
+    private void shutdownEngine() {
+        try {
+            if (engine != null) {
+                engine.close();
+            }
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        } finally {
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+
+            debeziumStarted = false;
+
+            if (handover != null) {
+                handover.close();
+            }
+        }
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    @VisibleForTesting
+    public LinkedMap getPendingOffsetsToCommit() {
+        return pendingOffsetsToCommit;
+    }
+
+    @VisibleForTesting
+    public boolean getDebeziumStarted() {
+        return debeziumStarted;
+    }
+
+    private Class<?> determineDatabase() {
+        boolean isCompatibleWithLegacy =
+                FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+        if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) {
+            // specifies the legacy implementation but the state may be incompatible
+            if (isCompatibleWithLegacy) {
+                return FlinkDatabaseHistory.class;
+            } else {
+                throw new IllegalStateException(
+                        "The configured option 'debezium.internal.implementation' is 'legacy', but the state of "
+                                + "source is incompatible with this implementation, you should remove the the option.");
+            }
+        } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+            // tries the non-legacy first
+            return FlinkDatabaseSchemaHistory.class;
+        } else if (isCompatibleWithLegacy) {
+            // fallback to legacy if possible
+            return FlinkDatabaseHistory.class;
+        } else {
+            // impossible
+            throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
+        }
+    }
+
+    @VisibleForTesting
+    public String getEngineInstanceName() {
+        return engineInstanceName;
+    }
+
+    public MetricData getMetricData() {
+        return metricData;
+    }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
new file mode 100644
index 000000000..07b6f9044
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
@@ -0,0 +1,88 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A collection class for handling metrics
+ */
+public class MetricData {
+
+    private final MetricGroup metricGroup;
+
+    private Counter numRecordsIn;
+    private Counter numBytesIn;
+    private Meter numRecordsInPerSecond;
+    private Meter numBytesInPerSecond;
+    private static Integer TIME_SPAN_IN_SECONDS = 60;
+    private static String STREAM_ID = "streamId";
+    private static String GROUP_ID = "groupId";
+    private static String NODE_ID = "nodeId";
+
+    public MetricData(MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+    }
+
+    public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
+        numRecordsIn =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+    }
+
+    public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
+        numBytesIn =
+                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
+                        .counter(metricName);
+    }
+
+    public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
+            String metricName) {
+        numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
+                        nodeId)
+                .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
+    }
+
+    public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId,
+            String metricName) {
+        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
+                .addGroup(NODE_ID, nodeId)
+                .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
+    }
+
+    public Counter getNumRecordsIn() {
+        return numRecordsIn;
+    }
+
+    public Counter getNumBytesIn() {
+        return numBytesIn;
+    }
+
+    public Meter getNumRecordsInPerSecond() {
+        return numRecordsInPerSecond;
+    }
+
+    public Meter getNumBytesInPerSecond() {
+        return numBytesInPerSecond;
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
new file mode 100644
index 000000000..30d2268d6
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.postgresql.PostgresConnector;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume binlog for
+ * PostgreSQL.
+ */
+public class PostgreSQLSource {
+
+    private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /** Builder class of {@link PostgreSQLSource}. */
+    public static class Builder<T> {
+
+        private String pluginName = "decoderbufs";
+        private String slotName = "flink";
+        private int port = 5432; // default 5432 port
+        private String hostname;
+        private String database;
+        private String username;
+        private String password;
+        private String[] schemaList;
+        private String[] tableList;
+        private Properties dbzProperties;
+        private DebeziumDeserializationSchema<T> deserializer;
+        private String inlongMetric;
+
+        /**
+         * The name of the Postgres logical decoding plug-in installed on the server. Supported
+         * values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,
+         * wal2json_rds_streaming and pgoutput.
+         */
+        public Builder<T> decodingPluginName(String name) {
+            this.pluginName = name;
+            return this;
+        }
+
+        public Builder<T> hostname(String hostname) {
+            this.hostname = hostname;
+            return this;
+        }
+
+        /** Integer port number of the PostgreSQL database server. */
+        public Builder<T> port(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /** The name of the PostgreSQL database from which to stream the changes. */
+        public Builder<T> database(String database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match schema names to be monitored; any
+         * schema name not included in the whitelist will be excluded from monitoring. By default
+         * all non-system schemas will be monitored.
+         */
+        public Builder<T> schemaList(String... schemaList) {
+            this.schemaList = schemaList;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match fully-qualified table identifiers for
+         * tables to be monitored; any table not included in the whitelist will be excluded from
+         * monitoring. Each identifier is of the form schemaName.tableName. By default the connector
+         * will monitor every non-system table in each monitored schema.
+         */
+        public Builder<T> tableList(String... tableList) {
+            this.tableList = tableList;
+            return this;
+        }
+
+        /**
+         * Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
+         */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to use when connecting to the PostgreSQL database server. */
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
+         * The name of the PostgreSQL logical decoding slot that was created for streaming changes
+         * from a particular plug-in for a particular database/schema. The server uses this slot to
+         * stream events to the connector that you are configuring. Default is "flink".
+         *
+         * <p>Slot names must conform to <a
+         * href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL
+         * replication slot naming rules</a>, which state: "Each replication slot has a name, which
+         * can contain lower-case letters, numbers, and the underscore character."
+         */
+        public Builder<T> slotName(String slotName) {
+            this.slotName = slotName;
+            return this;
+        }
+
+        /** The Debezium Postgres connector properties. */
+        public Builder<T> debeziumProperties(Properties properties) {
+            this.dbzProperties = properties;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        public Builder<T> inlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
+        public DebeziumSourceFunction<T> build() {
+            Properties props = new Properties();
+            props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
+            props.setProperty("plugin.name", pluginName);
+            // hard code server name, because we don't need to distinguish it, docs:
+            // Logical name that identifies and provides a namespace for the particular PostgreSQL
+            // database server/cluster being monitored. The logical name should be unique across
+            // all other connectors, since it is used as a prefix for all Kafka topic names coming
+            // from this connector. Only alphanumeric characters and underscores should be used.
+            props.setProperty("database.server.name", "postgres_cdc_source");
+            props.setProperty("database.hostname", checkNotNull(hostname));
+            props.setProperty("database.dbname", checkNotNull(database));
+            props.setProperty("database.user", checkNotNull(username));
+            props.setProperty("database.password", checkNotNull(password));
+            props.setProperty("database.port", String.valueOf(port));
+            props.setProperty("slot.name", slotName);
+            // we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch
+            // is invoked after job restart
+            props.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));
+
+            if (schemaList != null) {
+                props.setProperty("schema.whitelist", String.join(",", schemaList));
+            }
+            if (tableList != null) {
+                props.setProperty("table.whitelist", String.join(",", tableList));
+            }
+
+            if (dbzProperties != null) {
+                props.putAll(dbzProperties);
+            }
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, null, Validator.getDefaultValidator(), inlongMetric);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
new file mode 100644
index 000000000..7826202ac
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+
+/**
+ * Factory for creating configured instance of
+ * {@link com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource}.
+ */
+public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+
+    private static final String IDENTIFIER = "postgres-cdc-inlong";
+
+    public static final ConfigOption<String> INLONG_METRIC =
+            ConfigOptions.key("inlong.metric")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+
+    private static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL database server.");
+
+    private static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL database server.");
+
+    private static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when connecting to the PostgreSQL database server"
+                                    + ".");
+
+    private static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL database server.");
+
+    private static final ConfigOption<String> DATABASE_NAME =
+            ConfigOptions.key("database-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Database name of the PostgreSQL server to monitor.");
+
+    private static final ConfigOption<String> SCHEMA_NAME =
+            ConfigOptions.key("schema-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Schema name of the PostgreSQL database to monitor.");
+
+    private static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the PostgreSQL database to monitor.");
+
+    private static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in installed on the server.\n"
+                                    + "Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    private static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .defaultValue("flink")
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot that was created for streaming changes "
+                                    + "from a particular plug-in for a particular database/schema. The server uses "
+                                    + "this slot "
+                                    + "to stream events to the connector that you are configuring. Default is "
+                                    + "\"flink\".");
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+
+        final ReadableConfig config = helper.getOptions();
+        String hostname = config.get(HOSTNAME);
+        String username = config.get(USERNAME);
+        String password = config.get(PASSWORD);
+        String databaseName = config.get(DATABASE_NAME);
+        String schemaName = config.get(SCHEMA_NAME);
+        String tableName = config.get(TABLE_NAME);
+        int port = config.get(PORT);
+        String pluginName = config.get(DECODING_PLUGIN_NAME);
+        String slotName = config.get(SLOT_NAME);
+        ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
+        String inlongMetric = config.get(INLONG_METRIC);
+
+        return new PostgreSQLTableSource(
+                physicalSchema,
+                port,
+                hostname,
+                databaseName,
+                schemaName,
+                tableName,
+                username,
+                password,
+                pluginName,
+                slotName,
+                getDebeziumProperties(context.getCatalogTable().getOptions()),
+                inlongMetric);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTNAME);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(DATABASE_NAME);
+        options.add(SCHEMA_NAME);
+        options.add(TABLE_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PORT);
+        options.add(DECODING_PLUGIN_NAME);
+        options.add(SLOT_NAME);
+        options.add(INLONG_METRIC);
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
new file mode 100644
index 000000000..89769a509
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
+import com.ververica.cdc.connectors.postgres.table.PostgresValueValidator;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.cdc.postgres.DebeziumSourceFunction;
+import org.apache.inlong.sort.cdc.postgres.PostgreSQLSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical
+ * description.
+ */
+public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final int port;
+    private final String hostname;
+    private final String database;
+    private final String schemaName;
+    private final String tableName;
+    private final String username;
+    private final String password;
+    private final String pluginName;
+    private final String slotName;
+    private final Properties dbzProperties;
+    private final String inlongMetric;
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    public PostgreSQLTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String schemaName,
+            String tableName,
+            String username,
+            String password,
+            String pluginName,
+            String slotName,
+            Properties dbzProperties,
+            String inlongMetric) {
+        this.physicalSchema = physicalSchema;
+        this.port = port;
+        this.hostname = checkNotNull(hostname);
+        this.database = checkNotNull(database);
+        this.schemaName = checkNotNull(schemaName);
+        this.tableName = checkNotNull(tableName);
+        this.username = checkNotNull(username);
+        this.password = checkNotNull(password);
+        this.pluginName = checkNotNull(pluginName);
+        this.slotName = slotName;
+        this.dbzProperties = dbzProperties;
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.inlongMetric = inlongMetric;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        RowType physicalDataType =
+                (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType(physicalDataType)
+                        .setMetadataConverters(metadataConverters)
+                        .setResultTypeInfo(typeInfo)
+                        .setValueValidator(new PostgresValueValidator(schemaName, tableName))
+                        .build();
+        DebeziumSourceFunction<RowData> sourceFunction =
+                PostgreSQLSource.<RowData>builder()
+                        .hostname(hostname)
+                        .port(port)
+                        .database(database)
+                        .schemaList(schemaName)
+                        .tableList(schemaName + "." + tableName)
+                        .username(username)
+                        .password(password)
+                        .decodingPluginName(pluginName)
+                        .slotName(slotName)
+                        .debeziumProperties(dbzProperties)
+                        .deserializer(deserializer)
+                        .inlongMetric(inlongMetric)
+                        .build();
+        return SourceFunctionProvider.of(sourceFunction, false);
+    }
+
+    private MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key ->
+                                Stream.of(PostgreSQLReadableMetadata.values())
+                                        .filter(m -> m.getKey().equals(key))
+                                        .findFirst()
+                                        .orElseThrow(IllegalStateException::new))
+                .map(PostgreSQLReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        PostgreSQLTableSource source =
+                new PostgreSQLTableSource(
+                        physicalSchema,
+                        port,
+                        hostname,
+                        database,
+                        schemaName,
+                        tableName,
+                        username,
+                        password,
+                        pluginName,
+                        slotName,
+                        dbzProperties,
+                        inlongMetric);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PostgreSQLTableSource that = (PostgreSQLTableSource) o;
+        return port == that.port
+                && Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hostname, that.hostname)
+                && Objects.equals(database, that.database)
+                && Objects.equals(schemaName, that.schemaName)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(pluginName, that.pluginName)
+                && Objects.equals(slotName, that.slotName)
+                && Objects.equals(dbzProperties, that.dbzProperties)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(inlongMetric, that.inlongMetric);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                schemaName,
+                tableName,
+                username,
+                password,
+                pluginName,
+                slotName,
+                dbzProperties,
+                producedDataType,
+                metadataKeys,
+                inlongMetric);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "PostgreSQL-CDC";
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(PostgreSQLReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                PostgreSQLReadableMetadata::getKey,
+                                PostgreSQLReadableMetadata::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/postgres-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..a82c5ed53
--- /dev/null
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.cdc.postgres.table.PostgreSQLTableFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index b926a91f8..1a7b4a44a 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -432,6 +432,10 @@
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumOffsetSerializer.java
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
+      inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
+      inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+      inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
+      inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
  Source  : flink-cdc-connectors 2.2.1 (Please note that the software have been modified.)
  License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
  
@@ -476,6 +480,12 @@
   source  : flink-connector-elasticsearch 1.13.2 (Please note that the software have been modified.)
   License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
 
+ 1.3.5 inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+       inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+       inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
+  Source  : flink-connector-hbase-2.2 1.13.5 (Please note that the software have been modified.)
+  License : https://github.com/apache/flink/blob/master/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents: