You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/12 12:37:37 UTC

[shardingsphere] branch master updated: Fix openGauss logical replication slot creation (#16773)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b117204690 Fix openGauss logical replication slot creation (#16773)
5b117204690 is described below

commit 5b1172046909b8323c04d99d927ef7f8b5a2853d
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Apr 12 20:37:24 2022 +0800

    Fix openGauss logical replication slot creation (#16773)
---
 .../ingest/OpenGaussPositionInitializer.java       | 73 ++++++++++++++++++++-
 .../opengauss/ingest/OpenGaussWalDumper.java       | 46 +++++---------
 .../ingest/wal/OpenGaussLogicalReplication.java    | 74 ----------------------
 .../postgresql/ingest/PostgreSQLWalDumper.java     |  1 +
 .../postgresql/ingest/wal/LogicalReplication.java  |  1 +
 5 files changed, 88 insertions(+), 107 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 12d2aa93e76..086dd9e7def 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -17,13 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 
-import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.opengauss.replication.LogSequenceNumber;
 
 import javax.sql.DataSource;
+import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -32,12 +33,20 @@ import java.sql.SQLException;
 /**
  * OpenGauss wal position initializer.
  */
+// TODO reuse PostgreSQLPositionInitializer
+@Slf4j
 public final class OpenGaussPositionInitializer implements PositionInitializer {
     
+    private static final String SLOT_NAME_PREFIX = "sharding_scaling";
+    
+    private static final String DECODE_PLUGIN = "mppdb_decoding";
+    
+    private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+    
     @Override
     public WalPosition init(final DataSource dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            // TODO createSlotIfNotExists
+            createSlotIfNotExist(connection);
             return getWalPosition(connection);
         }
     }
@@ -47,6 +56,41 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
         return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
     }
     
+    /**
+     * Create logical replication slot if it does not exist.
+     *
+     * @param connection connection
+     * @throws SQLException SQL exception
+     */
+    private void createSlotIfNotExist(final Connection connection) throws SQLException {
+        String slotName = getUniqueSlotName(connection);
+        if (!isSlotExist(connection, slotName)) {
+            createSlotBySQL(connection);
+        }
+    }
+    
+    private boolean isSlotExist(final Connection connection, final String slotName) throws SQLException {
+        String sql = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.setString(2, DECODE_PLUGIN);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                return resultSet.next();
+            }
+        }
+    }
+    
+    private void createSlotBySQL(final Connection connection) throws SQLException {
+        String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN);
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            preparedStatement.execute();
+        } catch (final SQLException ex) {
+            if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
+                throw ex;
+            }
+        }
+    }
+    
     private WalPosition getWalPosition(final Connection connection) throws SQLException {
         try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
              ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -58,10 +102,33 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     @Override
     public void destroy(final DataSource dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            OpenGaussLogicalReplication.dropSlot(connection);
+            dropSlotIfExist(connection);
         }
     }
     
+    private void dropSlotIfExist(final Connection connection) throws SQLException {
+        String slotName = getUniqueSlotName(connection);
+        if (!isSlotExist(connection, slotName)) {
+            log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
+            return;
+        }
+        String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
+        try (CallableStatement callableStatement = connection.prepareCall(sql)) {
+            callableStatement.execute();
+        }
+    }
+    
+    /**
+     * Get the unique slot name by connection.
+     *
+     * @param connection connection
+     * @return the unique name by connection
+     * @throws SQLException failed when getCatalog
+     */
+    public static String getUniqueSlotName(final Connection connection) throws SQLException {
+        return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog());
+    }
+    
     @Override
     public String getType() {
         return "openGauss";
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index d6e7b2721f2..e5ee5899712 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -39,9 +38,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Abstr
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.PGReplicationStream;
 
-import javax.sql.DataSource;
 import java.nio.ByteBuffer;
-import java.sql.Connection;
 import java.sql.SQLException;
 
 /**
@@ -58,8 +55,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
     
     private final WalEventConverter walEventConverter;
     
-    private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;
-    
     private final PipelineChannel channel;
     
     public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
@@ -79,29 +74,11 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
         dump();
     }
     
-    private PgConnection getReplicationConn() throws SQLException {
-        return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
-    }
-    
-    private MppdbDecodingPlugin initReplication() {
-        try {
-            DataSource pipelineDataSource = PipelineDataSourceCreatorFactory.getInstance(
-                    dumperConfig.getDataSourceConfig().getType()).createPipelineDataSource(dumperConfig.getDataSourceConfig().getDataSourceConfiguration());
-            try (Connection connection = pipelineDataSource.getConnection()) {
-                slotName = OpenGaussLogicalReplication.getUniqueSlotName(connection);
-                OpenGaussLogicalReplication.createIfNotExists(connection);
-                return new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils()));
-            }
-        } catch (final SQLException ex) {
-            log.warn("Create replication slot failed!");
-        }
-        return null;
-    }
-    
     private void dump() {
-        DecodingPlugin decodingPlugin = initReplication();
-        try (PgConnection connection = getReplicationConn()) {
-            PGReplicationStream stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), slotName);
+        PGReplicationStream stream = null;
+        try (PgConnection connection = getReplicationConnectionUnwrap()) {
+            stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection));
+            DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()));
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
@@ -113,13 +90,22 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
                 pushRecord(record);
             }
         } catch (final SQLException ex) {
-            if (ex.getMessage().contains("is already active")) {
-                return;
-            }
             throw new IngestException(ex);
+        } finally {
+            if (null != stream) {
+                try {
+                    stream.close();
+                } catch (final SQLException ex) {
+                    log.error("Close PGReplicationStream failed", ex);
+                }
+            }
         }
     }
     
+    private PgConnection getReplicationConnectionUnwrap() throws SQLException {
+        return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
+    }
+    
     private void pushRecord(final Record record) {
         channel.pushRecord(record);
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index 9f20968ad19..419b048a683 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -25,13 +25,9 @@ import org.opengauss.PGProperty;
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.LogSequenceNumber;
 import org.opengauss.replication.PGReplicationStream;
-import org.opengauss.util.PSQLException;
 
-import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
 
@@ -41,13 +37,6 @@ import java.util.Properties;
 @Slf4j
 public final class OpenGaussLogicalReplication {
     
-    // TODO it should be private
-    public static final String SLOT_NAME_PREFIX = "sharding_scaling";
-    
-    public static final String DECODE_PLUGIN = "mppdb_decoding";
-    
-    public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
-    
     /**
      * Create connection.
      *
@@ -85,67 +74,4 @@ public final class OpenGaussLogicalReplication {
                 .withStartPosition((LogSequenceNumber) startPosition.get())
                 .start();
     }
-    
-    /**
-     * Create slots (drop existed slot before create).
-     *
-     * @param connection connection
-     * @throws SQLException SQL exception
-     */
-    public static void createIfNotExists(final Connection connection) throws SQLException {
-        String slotName = getUniqueSlotName(connection);
-        if (!isSlotNameExist(connection, slotName)) {
-            createSlotBySQL(connection);
-        }
-    }
-    
-    /**
-     * Drop replication slot by connection.
-     *
-     * @param connection connection
-     * @throws SQLException drop SQL with error
-     */
-    public static void dropSlot(final Connection connection) throws SQLException {
-        String slotName = getUniqueSlotName(connection);
-        if (!isSlotNameExist(connection, slotName)) {
-            log.info("dropSlot, slot not exist, ignore, slotName={}", slotName);
-            return;
-        }
-        String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
-        try (CallableStatement callableStatement = connection.prepareCall(sql)) {
-            callableStatement.execute();
-        }
-    }
-    
-    private static boolean isSlotNameExist(final Connection connection, final String slotName) throws SQLException {
-        String sql = "select * from pg_replication_slots where slot_name=?";
-        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, slotName);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                return resultSet.next();
-            }
-        }
-    }
-    
-    private static void createSlotBySQL(final Connection connection) throws SQLException {
-        String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN);
-        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.execute();
-        } catch (final PSQLException ex) {
-            if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
-                throw ex;
-            }
-        }
-    }
-    
-    /**
-     * Get the unique slot name by connection.
-     *
-     * @param connection connection
-     * @return the unique name by connection
-     * @throws SQLException failed when getCatalog
-     */
-    public static String getUniqueSlotName(final Connection connection) throws SQLException {
-        return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog());
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index b725bf2cb2d..0f65746e28c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -76,6 +76,7 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     }
     
     private void dump() {
+        // TODO use unified PgConnection
         try (Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
              PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection), walPosition.getLogSequenceNumber())) {
             PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
index 654bc7afb74..810549f9aa3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 /**
  * PostgreSQL logical replication.
  */
+// TODO add prefix PostgreSQL
 public final class LogicalReplication {
     
     /**