You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/02/03 08:31:04 UTC

(seatunnel) branch dev updated: [Feature][Connector]update pgsql-cdc publication for add table (#6309)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2ad7d65236 [Feature][Connector]update pgsql-cdc publication for add table (#6309)
2ad7d65236 is described below

commit 2ad7d6523667fbe0e8054adfdd198a7535fe619e
Author: 老王 <58...@users.noreply.github.com>
AuthorDate: Sat Feb 3 16:30:59 2024 +0800

    [Feature][Connector]update pgsql-cdc publication for add table (#6309)
---
 .licenserc.yaml                                    |   1 +
 LICENSE                                            |   1 +
 .../postgresql/PostgresConnectorConfig.java        |   2 +-
 .../connection/PostgresReplicationConnection.java  | 921 +++++++++++++++++++++
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      |   2 +-
 5 files changed, 925 insertions(+), 2 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index ea6178c366..a851d42f38 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -49,5 +49,6 @@ header:
     - 'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java'
     - 'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java'
     - 'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java'
+    - 'seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java'
 
   comment: on-failure
diff --git a/LICENSE b/LICENSE
index adabba50de..f3c1d4c3c3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,6 +220,7 @@ seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apac
 seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
 seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java   from https://github.com/debezium/debezium
 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb from https://github.com/ververica/flink-cdc-connectors
+seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java     from https://github.com/debezium/debezium
 generate_client_protocol.sh                                                                                                                 from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java                          from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java   from https://github.com/hazelcast/hazelcast
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
index 92462d0ba2..57de61b472 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
@@ -738,7 +738,7 @@ public class PostgresConnectorConfig extends RelationalDatabaseConnectorConfig {
     public static final Field PUBLICATION_AUTOCREATE_MODE =
             Field.create("publication.autocreate.mode")
                     .withDisplayName("Publication Auto Create Mode")
-                    .withEnum(AutoCreateMode.class, AutoCreateMode.ALL_TABLES)
+                    .withEnum(AutoCreateMode.class, AutoCreateMode.FILTERED)
                     .withWidth(Width.MEDIUM)
                     .withImportance(Importance.MEDIUM)
                     .withDescription(
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
new file mode 100644
index 0000000000..46538c86e7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
@@ -0,0 +1,921 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Implementation of a {@link ReplicationConnection} for Postgresql. Note that replication
+ * connections in PG cannot execute regular statements but only a limited number of
+ * replication-related commands.
+ *
+ * @author Horia Chiorean (hchiorea@redhat.com)
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
+
+    private static final String TABLE_INCLUDE_LIST = "table.include.list";
+    private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+    private final String slotName;
+    private final String publicationName;
+    private final RelationalTableFilters tableFilter;
+    private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
+    private final PostgresConnectorConfig.LogicalDecoder plugin;
+    private final boolean dropSlotOnClose;
+    private final PostgresConnectorConfig originalConfig;
+    private final Duration statusUpdateInterval;
+    private final MessageDecoder messageDecoder;
+    private final TypeRegistry typeRegistry;
+    private final Properties streamParams;
+
+    private Lsn defaultStartingPos;
+    private SlotCreationResult slotCreationInfo;
+    private boolean hasInitedSlot;
+
+    /**
+     * Creates a new replication connection with the given params.
+     *
+     * @param config the JDBC configuration for the connection; may not be null
+     * @param slotName the name of the DB slot for logical replication; may not be null
+     * @param publicationName the name of the DB publication for logical replication; may not be
+     *     null
+     * @param tableFilter the tables to watch of the DB publication for logical replication; may not
+     *     be null
+     * @param publicationAutocreateMode the mode for publication autocreation; may not be null
+     * @param plugin decoder matching the server side plug-in used for streaming changes; may not be
+     *     null
+     * @param dropSlotOnClose whether the replication slot should be dropped once the connection is
+     *     closed
+     * @param statusUpdateInterval the interval at which the replication connection should
+     *     periodically send status
+     * @param doSnapshot whether the connector is doing snapshot
+     * @param typeRegistry registry with PostgreSQL types
+     * @param streamParams additional parameters to pass to the replication stream
+     * @param schema the schema; must not be null
+     *     <p>updates to the server
+     */
+    private PostgresReplicationConnection(
+            PostgresConnectorConfig config,
+            String slotName,
+            String publicationName,
+            RelationalTableFilters tableFilter,
+            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+            PostgresConnectorConfig.LogicalDecoder plugin,
+            boolean dropSlotOnClose,
+            boolean doSnapshot,
+            Duration statusUpdateInterval,
+            TypeRegistry typeRegistry,
+            Properties streamParams,
+            PostgresSchema schema) {
+        super(
+                config.getJdbcConfig(),
+                PostgresConnection.FACTORY,
+                null,
+                PostgresReplicationConnection::defaultSettings);
+
+        this.originalConfig = config;
+        this.slotName = slotName;
+        this.publicationName = publicationName;
+        this.tableFilter = tableFilter;
+        this.publicationAutocreateMode = publicationAutocreateMode;
+        this.plugin = plugin;
+        this.dropSlotOnClose = dropSlotOnClose;
+        this.statusUpdateInterval = statusUpdateInterval;
+        this.messageDecoder = plugin.messageDecoder(new MessageDecoderContext(config, schema));
+        this.typeRegistry = typeRegistry;
+        this.streamParams = streamParams;
+        this.slotCreationInfo = null;
+        this.hasInitedSlot = false;
+    }
+
+    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
+        try (PostgresConnection connection =
+                new PostgresConnection(originalConfig.getJdbcConfig())) {
+            return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName());
+        }
+    }
+
+    protected void initPublication() {
+        String createPublicationStmt;
+        String tableFilterString = null;
+        String tableListStr = this.originalConfig.getConfig().asMap().get(TABLE_INCLUDE_LIST);
+        final List<String> configTableList =
+                new ArrayList<>(Arrays.asList(tableListStr.split(",")));
+        configTableList.removeAll(this.getPublicationExistedTableList());
+        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+            LOGGER.info("Initializing PgOutput logical decoder publication");
+            try {
+                String selectPublication =
+                        String.format(
+                                "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'",
+                                publicationName);
+                try (Statement stmt = pgConnection().createStatement();
+                        ResultSet rs = stmt.executeQuery(selectPublication)) {
+                    if (rs.next()) {
+                        Long count = rs.getLong(1);
+                        if (count == 0L) {
+                            LOGGER.info(
+                                    "Creating new publication '{}' for plugin '{}'",
+                                    publicationName,
+                                    plugin);
+                            switch (publicationAutocreateMode) {
+                                case DISABLED:
+                                    throw new ConnectException(
+                                            "Publication autocreation is disabled, please create one and restart the connector.");
+                                case ALL_TABLES:
+                                    createPublicationStmt =
+                                            String.format(
+                                                    "CREATE PUBLICATION %s FOR ALL TABLES;",
+                                                    publicationName);
+                                    LOGGER.info(
+                                            "Creating Publication with statement '{}'",
+                                            createPublicationStmt);
+                                    // Publication doesn't exist, create it.
+                                    stmt.execute(createPublicationStmt);
+                                    break;
+                                case FILTERED:
+                                    try {
+                                        Set<TableId> tablesToCapture =
+                                                determineCapturedTables(configTableList);
+                                        tableFilterString =
+                                                tablesToCapture.stream()
+                                                        .map(TableId::toDoubleQuotedString)
+                                                        .collect(Collectors.joining(", "));
+                                        if (tableFilterString.isEmpty()) {
+                                            throw new DebeziumException(
+                                                    String.format(
+                                                            "No table filters found for filtered publication %s",
+                                                            publicationName));
+                                        }
+                                        createPublicationStmt =
+                                                String.format(
+                                                        "CREATE PUBLICATION %s FOR TABLE %s;",
+                                                        publicationName, tableFilterString);
+                                        LOGGER.info(
+                                                "Creating Publication with statement '{}'",
+                                                createPublicationStmt);
+                                        // Publication doesn't exist, create it but restrict to the
+                                        // tableFilter.
+                                        stmt.execute(createPublicationStmt);
+                                    } catch (Exception e) {
+                                        throw new ConnectException(
+                                                String.format(
+                                                        "Unable to create filtered publication %s for %s",
+                                                        publicationName, tableFilterString),
+                                                e);
+                                    }
+                                    break;
+                            }
+                        } else {
+                            if (CollectionUtils.isNotEmpty(configTableList)) {
+                                switch (publicationAutocreateMode) {
+                                    case DISABLED:
+                                        throw new ConnectException(
+                                                "Publication autocreation is disabled, please create one and restart the connector.");
+                                    case ALL_TABLES:
+                                        break;
+                                    case FILTERED:
+                                        try {
+                                            Set<TableId> tablesToCapture =
+                                                    determineCapturedTables(configTableList);
+                                            tableFilterString =
+                                                    tablesToCapture.stream()
+                                                            .map(TableId::toDoubleQuotedString)
+                                                            .collect(Collectors.joining(", "));
+                                            if (tableFilterString.isEmpty()) {
+                                                throw new DebeziumException(
+                                                        String.format(
+                                                                "No table filters found for filtered publication %s",
+                                                                publicationName));
+                                            }
+                                            createPublicationStmt =
+                                                    String.format(
+                                                            "ALTER PUBLICATION %s ADD TABLE %s;",
+                                                            publicationName, tableFilterString);
+                                            LOGGER.info(
+                                                    "ALTER PUBLICATION with statement '{}'",
+                                                    createPublicationStmt);
+                                            // Publication doesn't exist, create it but restrict to
+                                            // the tableFilter.
+                                            stmt.execute(createPublicationStmt);
+                                        } catch (Exception e) {
+                                            throw new ConnectException(
+                                                    String.format(
+                                                            "Unable to create filtered publication %s for %s",
+                                                            publicationName, tableFilterString),
+                                                    e);
+                                        }
+                                        break;
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (SQLException e) {
+                throw new JdbcConnectionException(e);
+            }
+
+            // This is what ties the publication definition to the replication stream
+            streamParams.put("proto_version", 1);
+            streamParams.put("publication_names", publicationName);
+        }
+    }
+
+    private List<String> getPublicationExistedTableList() {
+        List<String> tableList = new ArrayList<>();
+        try {
+            String selectPublication =
+                    String.format(
+                            "SELECT * FROM pg_publication_tables WHERE pubname = '%s'; ",
+                            publicationName);
+            try (Statement stmt = pgConnection().createStatement();
+                    ResultSet rs = stmt.executeQuery(selectPublication)) {
+                while (rs.next()) {
+                    String schemaName = rs.getString("schemaname");
+                    String tableName = rs.getString("tablename");
+                    tableList.add(schemaName + "." + tableName);
+                }
+            }
+        } catch (SQLException e) {
+            throw new JdbcConnectionException(e);
+        }
+        return tableList;
+    }
+
+    private Set<TableId> determineCapturedTables(List<String> tableList) {
+        Set<TableId> tableIdSet = new HashSet<>();
+        if (CollectionUtils.isEmpty(tableList)) {
+            return tableIdSet;
+        }
+        tableList.forEach(
+                tableNameStr -> {
+                    String schemaName = tableNameStr.split("\\.")[0];
+                    String tableName = tableNameStr.split("\\.")[1];
+                    tableIdSet.add(new TableId("", schemaName, tableName));
+                });
+        return tableIdSet;
+    }
+
+    protected void initReplicationSlot() throws SQLException, InterruptedException {
+        ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
+
+        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
+        try {
+            // there's no info for this plugin and slot so create a new slot
+            if (shouldCreateSlot) {
+                this.createReplicationSlot();
+            }
+
+            // replication connection does not support parsing of SQL statements so we need to
+            // create
+            // the connection without executing on connect statements - see JDBC opt
+            // preferQueryMode=simple
+            pgConnection();
+            final String identifySystemStatement = "IDENTIFY_SYSTEM";
+            LOGGER.debug(
+                    "running '{}' to validate replication connection", identifySystemStatement);
+            final Lsn xlogStart =
+                    queryAndMap(
+                            identifySystemStatement,
+                            rs -> {
+                                if (!rs.next()) {
+                                    throw new IllegalStateException(
+                                            "The DB connection is not a valid replication connection");
+                                }
+                                String xlogpos = rs.getString("xlogpos");
+                                LOGGER.debug("received latest xlogpos '{}'", xlogpos);
+                                return Lsn.valueOf(xlogpos);
+                            });
+
+            if (slotCreationInfo != null) {
+                this.defaultStartingPos = slotCreationInfo.startLsn();
+            } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
+                // this is a new slot or we weren't able to read a valid flush LSN pos, so we always
+                // start from the xlog pos that was reported
+                this.defaultStartingPos = xlogStart;
+            } else {
+                Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
+                this.defaultStartingPos =
+                        latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart;
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn);
+                }
+            }
+            hasInitedSlot = true;
+        } catch (SQLException e) {
+            throw new JdbcConnectionException(e);
+        }
+    }
+
+    // Temporary replication slots is a new feature of PostgreSQL 10
+    private boolean useTemporarySlot() throws SQLException {
+        // Temporary replication slots cannot be used due to connection restart
+        // when finding WAL position
+        // return dropSlotOnClose && pgConnection().haveMinimumServerVersion(ServerVersion.v10);
+        return false;
+    }
+
+    /**
+     * creating a replication connection and starting to stream involves a few steps: 1. we create
+     * the connection and ensure that a. the slot exists b. the slot isn't currently being used 2.
+     * we query to get our potential start position in the slot (lsn) 3. we try and start streaming,
+     * depending on our options (such as in wal2json) this may fail, which can result in the
+     * connection being killed and we need to start the process over if we are using a temporary
+     * slot 4. actually start the streamer
+     *
+     * <p>This method takes care of all of these and this method queries for a default starting
+     * position If you know where you are starting from you should call {@link #startStreaming(Lsn,
+     * WalPositionLocator)}, this method delegates to that method
+     *
+     * @return
+     * @throws SQLException
+     * @throws InterruptedException
+     */
+    @Override
+    public ReplicationStream startStreaming(WalPositionLocator walPosition)
+            throws SQLException, InterruptedException {
+        return startStreaming(null, walPosition);
+    }
+
+    @Override
+    public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition)
+            throws SQLException, InterruptedException {
+        initConnection();
+
+        connect();
+        if (offset == null || !offset.isValid()) {
+            offset = defaultStartingPos;
+        }
+        Lsn lsn = offset;
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("starting streaming from LSN '{}'", lsn);
+        }
+
+        int maxRetries = config().getInteger(PostgresConnectorConfig.MAX_RETRIES);
+        int delay = config().getInteger(PostgresConnectorConfig.RETRY_DELAY_MS);
+        int tryCount = 0;
+        while (true) {
+            try {
+                return createReplicationStream(lsn, walPosition);
+            } catch (Exception e) {
+                String message = "Failed to start replication stream at " + lsn;
+                if (++tryCount > maxRetries) {
+                    if (e.getMessage().matches(".*replication slot .* is active.*")) {
+                        message +=
+                                "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
+                    }
+                    throw new DebeziumException(message, e);
+                } else {
+                    LOGGER.warn(
+                            message + ", waiting for {} ms and retrying, attempt number {} over {}",
+                            delay,
+                            tryCount,
+                            maxRetries);
+                    final Metronome metronome =
+                            Metronome.sleeper(Duration.ofMillis(delay), Clock.SYSTEM);
+                    metronome.pause();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void initConnection() throws SQLException, InterruptedException {
+        // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+        // For pgoutput specifically, the publication must be created before the slot.
+        initPublication();
+        if (!hasInitedSlot) {
+            initReplicationSlot();
+        }
+    }
+
+    @Override
+    public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
+        // note that some of these options are only supported in Postgres 9.4+, additionally
+        // the options are not yet exported by the jdbc api wrapper, therefore, we just do
+        // this ourselves but eventually this should be moved back to the jdbc API
+        // see https://github.com/pgjdbc/pgjdbc/issues/1305
+
+        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
+        String tempPart = "";
+        // Exported snapshots are supported in Postgres 9.4+
+        boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
+        if ((dropSlotOnClose) && !canExportSnapshot) {
+            LOGGER.warn(
+                    "A slot marked as temporary or with an exported snapshot was created, "
+                            + "but not on a supported version of Postgres, ignoring!");
+        }
+        if (useTemporarySlot()) {
+            tempPart = "TEMPORARY";
+        }
+
+        // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+        // For pgoutput specifically, the publication must be created prior to the slot.
+        initPublication();
+
+        try (Statement stmt = pgConnection().createStatement()) {
+            String createCommand =
+                    String.format(
+                            "CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
+                            slotName, tempPart, plugin.getPostgresPluginName());
+            LOGGER.info("Creating replication slot with command {}", createCommand);
+            stmt.execute(createCommand);
+            // when we are in Postgres 9.4+, we can parse the slot creation info,
+            // otherwise, it returns nothing
+            if (canExportSnapshot) {
+                this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
+            }
+
+            return Optional.ofNullable(slotCreationInfo);
+        }
+    }
+
+    protected BaseConnection pgConnection() throws SQLException {
+        return (BaseConnection) connection(false);
+    }
+
+    private SlotCreationResult parseSlotCreation(ResultSet rs) {
+        try {
+            if (rs.next()) {
+                String slotName = rs.getString("slot_name");
+                String startPoint = rs.getString("consistent_point");
+                String snapName = rs.getString("snapshot_name");
+                String pluginName = rs.getString("output_plugin");
+
+                return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
+            } else {
+                throw new ConnectException("No replication slot found");
+            }
+        } catch (SQLException ex) {
+            throw new ConnectException("Unable to parse create_replication_slot response", ex);
+        }
+    }
+
+    private ReplicationStream createReplicationStream(
+            final Lsn startLsn, WalPositionLocator walPosition)
+            throws SQLException, InterruptedException {
+        PGReplicationStream s;
+
+        try {
+            try {
+                s =
+                        startPgReplicationStream(
+                                startLsn,
+                                plugin.forceRds()
+                                        ? messageDecoder::optionsWithoutMetadata
+                                        : messageDecoder::optionsWithMetadata);
+                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
+            } catch (PSQLException e) {
+                LOGGER.debug(
+                        "Could not register for streaming, retrying without optional options", e);
+
+                // re-init the slot after a failed start of slot, as this
+                // may have closed the slot
+                if (useTemporarySlot()) {
+                    initReplicationSlot();
+                }
+
+                s =
+                        startPgReplicationStream(
+                                startLsn,
+                                plugin.forceRds()
+                                        ? messageDecoder::optionsWithoutMetadata
+                                        : messageDecoder::optionsWithMetadata);
+                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
+            }
+        } catch (PSQLException e) {
+            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
+                // It is possible we are connecting to an old wal2json plug-in
+                LOGGER.warn(
+                        "Could not register for streaming with metadata in messages, falling back to messages without metadata");
+
+                // re-init the slot after a failed start of slot, as this
+                // may have closed the slot
+                if (useTemporarySlot()) {
+                    initReplicationSlot();
+                }
+
+                s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata);
+                messageDecoder.setContainsMetadata(false);
+            } else if (e.getMessage()
+                    .matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
+                LOGGER.error("Cannot rewind to last processed WAL position", e);
+                throw new ConnectException(
+                        "The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
+            } else {
+                throw e;
+            }
+        }
+
+        final PGReplicationStream stream = s;
+
+        return new ReplicationStream() {
+
+            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
+            private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+            private ExecutorService keepAliveExecutor = null;
+            private AtomicBoolean keepAliveRunning;
+            private final Metronome metronome =
+                    Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
+
+            // make sure this is volatile since multiple threads may be interested in this value
+            private volatile Lsn lastReceivedLsn;
+
+            @Override
+            public void read(ReplicationMessageProcessor processor)
+                    throws SQLException, InterruptedException {
+                processWarnings(false);
+                ByteBuffer read = stream.read();
+                final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+                LOGGER.trace(
+                        "Streaming requested from LSN {}, received LSN {}",
+                        startLsn,
+                        lastReceiveLsn);
+                if (messageDecoder.shouldMessageBeSkipped(
+                        read, lastReceiveLsn, startLsn, walPosition)) {
+                    return;
+                }
+                deserializeMessages(read, processor);
+            }
+
+            @Override
+            public boolean readPending(ReplicationMessageProcessor processor)
+                    throws SQLException, InterruptedException {
+                processWarnings(false);
+                ByteBuffer read = stream.readPending();
+                final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+                LOGGER.trace(
+                        "Streaming requested from LSN {}, received LSN {}",
+                        startLsn,
+                        lastReceiveLsn);
+
+                if (read == null) {
+                    return false;
+                }
+
+                if (messageDecoder.shouldMessageBeSkipped(
+                        read, lastReceiveLsn, startLsn, walPosition)) {
+                    return true;
+                }
+
+                deserializeMessages(read, processor);
+
+                return true;
+            }
+
+            private void deserializeMessages(
+                    ByteBuffer buffer, ReplicationMessageProcessor processor)
+                    throws SQLException, InterruptedException {
+                lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+                LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+                messageDecoder.processMessage(buffer, processor, typeRegistry);
+            }
+
+            @Override
+            public void close() throws SQLException {
+                processWarnings(true);
+                stream.close();
+            }
+
+            @Override
+            public void flushLsn(Lsn lsn) throws SQLException {
+                doFlushLsn(lsn);
+            }
+
+            private void doFlushLsn(Lsn lsn) throws SQLException {
+                stream.setFlushedLSN(lsn.asLogSequenceNumber());
+                stream.setAppliedLSN(lsn.asLogSequenceNumber());
+
+                stream.forceUpdateStatus();
+            }
+
+            @Override
+            public Lsn lastReceivedLsn() {
+                return lastReceivedLsn;
+            }
+
+            @Override
+            public void startKeepAlive(ExecutorService service) {
+                if (keepAliveExecutor == null) {
+                    keepAliveExecutor = service;
+                    keepAliveRunning = new AtomicBoolean(true);
+                    keepAliveExecutor.submit(
+                            () -> {
+                                while (keepAliveRunning.get()) {
+                                    try {
+                                        LOGGER.trace(
+                                                "Forcing status update with replication stream");
+                                        stream.forceUpdateStatus();
+                                        metronome.pause();
+                                    } catch (Exception exp) {
+                                        throw new RuntimeException(
+                                                "received unexpected exception will perform keep alive",
+                                                exp);
+                                    }
+                                }
+                            });
+                }
+            }
+
+            @Override
+            public void stopKeepAlive() {
+                if (keepAliveExecutor != null) {
+                    keepAliveRunning.set(false);
+                    keepAliveExecutor.shutdownNow();
+                    keepAliveExecutor = null;
+                }
+            }
+
+            private void processWarnings(final boolean forced) throws SQLException {
+                if (--warningCheckCounter == 0 || forced) {
+                    warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+                    for (SQLWarning w = connection().getWarnings();
+                            w != null;
+                            w = w.getNextWarning()) {
+                        LOGGER.debug(
+                                "Server-side message: '{}', state = {}, code = {}",
+                                w.getMessage(),
+                                w.getSQLState(),
+                                w.getErrorCode());
+                    }
+                    connection().clearWarnings();
+                }
+            }
+
+            @Override
+            public Lsn startLsn() {
+                return startLsn;
+            }
+        };
+    }
+
+    private PGReplicationStream startPgReplicationStream(
+            final Lsn lsn,
+            Function<ChainedLogicalStreamBuilder, ChainedLogicalStreamBuilder> configurator)
+            throws SQLException {
+        assert lsn != null;
+        ChainedLogicalStreamBuilder streamBuilder =
+                pgConnection()
+                        .getReplicationAPI()
+                        .replicationStream()
+                        .logical()
+                        .withSlotName(slotName)
+                        .withStartPosition(lsn.asLogSequenceNumber())
+                        .withSlotOptions(streamParams);
+        streamBuilder = configurator.apply(streamBuilder);
+
+        if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
+            streamBuilder.withStatusInterval(
+                    toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
+        }
+
+        PGReplicationStream stream = streamBuilder.start();
+
+        // TODO DBZ-508 get rid of this
+        // Needed by tests when connections are opened and closed in a fast sequence
+        try {
+            Thread.sleep(10);
+        } catch (Exception e) {
+        }
+        stream.forceUpdateStatus();
+        return stream;
+    }
+
+    @Override
+    public synchronized void close() {
+        close(true);
+    }
+
+    public synchronized void close(boolean dropSlot) {
+        try {
+            LOGGER.debug("Closing message decoder");
+            messageDecoder.close();
+        } catch (Throwable e) {
+            LOGGER.error("Unexpected error while closing message decoder", e);
+        }
+
+        try {
+            LOGGER.debug("Closing replication connection");
+            super.close();
+        } catch (Throwable e) {
+            LOGGER.error("Unexpected error while closing Postgres connection", e);
+        }
+        if (dropSlotOnClose && dropSlot) {
+            // we're dropping the replication slot via a regular - i.e. not a replication -
+            // connection
+            try (PostgresConnection connection =
+                    new PostgresConnection(originalConfig.getJdbcConfig())) {
+                connection.dropReplicationSlot(slotName);
+            } catch (Throwable e) {
+                LOGGER.error("Unexpected error while dropping replication slot", e);
+            }
+        }
+    }
+
+    @Override
+    public void reconnect() throws SQLException {
+        close(false);
+        // Don't re-execute initial commands on reconnection
+        connection(false);
+    }
+
+    protected static void defaultSettings(Configuration.Builder builder) {
+        // first copy the parent's default settings...
+        PostgresConnection.defaultSettings(builder);
+        // then set some additional replication specific settings
+        builder.with("replication", "database")
+                .with(
+                        "preferQueryMode",
+                        "simple"); // replication protocol only supports simple query mode
+    }
+
+    protected static class ReplicationConnectionBuilder implements Builder {
+
+        private final PostgresConnectorConfig config;
+        private String slotName = DEFAULT_SLOT_NAME;
+        private String publicationName = DEFAULT_PUBLICATION_NAME;
+        private RelationalTableFilters tableFilter;
+        private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode =
+                PostgresConnectorConfig.AutoCreateMode.FILTERED;
+        private PostgresConnectorConfig.LogicalDecoder plugin =
+                PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
+        private PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode;
+        private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
+        private Duration statusUpdateIntervalVal;
+        private boolean doSnapshot;
+        private TypeRegistry typeRegistry;
+        private PostgresSchema schema;
+        private Properties slotStreamParams = new Properties();
+
+        protected ReplicationConnectionBuilder(PostgresConnectorConfig config) {
+            assert config != null;
+            this.config = config;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder withSlot(final String slotName) {
+            assert slotName != null;
+            this.slotName = slotName;
+            return this;
+        }
+
+        public ReplicationConnectionBuilder withC(final String slotName) {
+            assert slotName != null;
+            this.slotName = slotName;
+            return this;
+        }
+
+        @Override
+        public Builder withPublication(String publicationName) {
+            assert publicationName != null;
+            this.publicationName = publicationName;
+            return this;
+        }
+
+        @Override
+        public Builder withTableFilter(RelationalTableFilters tableFilter) {
+            assert tableFilter != null;
+            this.tableFilter = tableFilter;
+            return this;
+        }
+
+        @Override
+        public Builder withPublicationAutocreateMode(
+                PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
+            assert publicationName != null;
+            this.publicationAutocreateMode = publicationAutocreateMode;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder withPlugin(
+                final PostgresConnectorConfig.LogicalDecoder plugin) {
+            assert plugin != null;
+            this.plugin = plugin;
+            return this;
+        }
+
+        @Override
+        public Builder withTruncateHandlingMode(
+                PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode) {
+            assert truncateHandlingMode != null;
+            this.truncateHandlingMode = truncateHandlingMode;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) {
+            this.dropSlotOnClose = dropSlotOnClose;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder streamParams(final String slotStreamParams) {
+            if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
+                this.slotStreamParams = new Properties();
+                String[] paramsWithValues = slotStreamParams.split(";");
+                for (String paramsWithValue : paramsWithValues) {
+                    String[] paramAndValue = paramsWithValue.split("=");
+                    if (paramAndValue.length == 2) {
+                        this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
+                    } else {
+                        LOGGER.warn(
+                                "The following STREAM_PARAMS value is invalid: {}",
+                                paramsWithValue);
+                    }
+                }
+            }
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder statusUpdateInterval(
+                final Duration statusUpdateInterval) {
+            this.statusUpdateIntervalVal = statusUpdateInterval;
+            return this;
+        }
+
+        @Override
+        public Builder doSnapshot(boolean doSnapshot) {
+            this.doSnapshot = doSnapshot;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnection build() {
+            assert plugin != null : "Decoding plugin name is not set";
+            return new PostgresReplicationConnection(
+                    config,
+                    slotName,
+                    publicationName,
+                    tableFilter,
+                    publicationAutocreateMode,
+                    plugin,
+                    dropSlotOnClose,
+                    doSnapshot,
+                    statusUpdateIntervalVal,
+                    typeRegistry,
+                    slotStreamParams,
+                    schema);
+        }
+
+        @Override
+        public Builder withTypeRegistry(TypeRegistry typeRegistry) {
+            this.typeRegistry = typeRegistry;
+            return this;
+        }
+
+        @Override
+        public Builder withSchema(PostgresSchema schema) {
+            this.schema = schema;
+            return this;
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 88aca48e1e..0a4be385da 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -94,7 +94,7 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource {
     // use newer version of postgresql image to support pgoutput plugin
     // when testing postgres 13, only 13-alpine supports both amd64 and arm64
     protected static final DockerImageName PG_IMAGE =
-            DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
+            DockerImageName.parse("debezium/postgres:11").asCompatibleSubstituteFor("postgres");
 
     public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
             new PostgreSQLContainer<>(PG_IMAGE)