You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/13 09:45:18 UTC

[shardingsphere] branch master updated: Refactor PostgreSQLPositionInitializer code style (#16786)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 05dc0a81390 Refactor PostgreSQLPositionInitializer code style (#16786)
05dc0a81390 is described below

commit 05dc0a81390fe27ea9c11130d6ead152c9d1cee6
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Apr 13 17:45:13 2022 +0800

    Refactor PostgreSQLPositionInitializer code style (#16786)
---
 .../ingest/PostgreSQLPositionInitializer.java      | 54 +++++++++++-----------
 1 file changed, 27 insertions(+), 27 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 14b2bfa838c..606198c049e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.postgresql.replication.LogSequenceNumber;
-import org.postgresql.util.PSQLException;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -36,17 +35,16 @@ import java.sql.SQLException;
 @Slf4j
 public final class PostgreSQLPositionInitializer implements PositionInitializer {
     
-    // TODO it should be private; _PREFIX;
-    public static final String SLOT_NAME = "sharding_scaling";
+    private static final String SLOT_NAME_PREFIX = "sharding_scaling";
     
-    public static final String DECODE_PLUGIN = "test_decoding";
+    private static final String DECODE_PLUGIN = "test_decoding";
     
-    public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+    private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
     @Override
     public WalPosition init(final DataSource dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            createIfNotExists(connection);
+            createSlotIfNotExist(connection, getUniqueSlotName(connection));
             return getWalPosition(connection);
         }
     }
@@ -56,24 +54,25 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
         return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
     }
     
-    private void createIfNotExists(final Connection connection) throws SQLException {
-        if (checkSlotExistsOrNot(connection)) {
-            log.info("replication slot already exist, slot name: {}", SLOT_NAME);
+    private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
+        if (isSlotExisting(connection, slotName)) {
+            log.info("createSlotIfNotExist, slot exist, slotName={}", slotName);
             return;
         }
-        try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN))) {
-            ps.execute();
-        } catch (final PSQLException ex) {
+        String createSlotSQL = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
+        try (PreparedStatement preparedStatement = connection.prepareStatement(createSlotSQL)) {
+            preparedStatement.execute();
+        } catch (final SQLException ex) {
             if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
                 throw ex;
             }
         }
     }
     
-    private boolean checkSlotExistsOrNot(final Connection connection) throws SQLException {
+    private boolean isSlotExisting(final Connection connection, final String slotName) throws SQLException {
         String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
         try (PreparedStatement preparedStatement = connection.prepareStatement(checkSlotSQL)) {
-            preparedStatement.setString(1, getUniqueSlotName(connection));
+            preparedStatement.setString(1, slotName);
             preparedStatement.setString(2, DECODE_PLUGIN);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 return resultSet.next();
@@ -82,39 +81,40 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     }
     
     private WalPosition getWalPosition(final Connection connection) throws SQLException {
-        try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
-             ResultSet rs = ps.executeQuery()) {
-            rs.next();
-            return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(rs.getString(1))));
+        try (PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
+             ResultSet resultSet = preparedStatement.executeQuery()) {
+            resultSet.next();
+            return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
         }
     }
     
-    private String getSql(final Connection connection) throws SQLException {
+    private String getLogSequenceNumberSQL(final Connection connection) throws SQLException {
         if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
             return "SELECT PG_CURRENT_XLOG_LOCATION()";
         }
         if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
             return "SELECT PG_CURRENT_WAL_LSN()";
         }
-        throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
+        throw new RuntimeException("Unsupported PostgreSQL version: " + connection.getMetaData().getDatabaseProductVersion());
     }
     
     @Override
     public void destroy(final DataSource dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExists(connection);
+            dropSlotIfExist(connection);
         }
     }
     
-    private void dropSlotIfExists(final Connection connection) throws SQLException {
-        if (!checkSlotExistsOrNot(connection)) {
-            log.info("drop, slot not exist, slot name: {}", SLOT_NAME);
+    private void dropSlotIfExist(final Connection connection) throws SQLException {
+        String slotName = getUniqueSlotName(connection);
+        if (!isSlotExisting(connection, slotName)) {
+            log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
             return;
         }
-        log.info("drop, slot exist, slot name: {}", SLOT_NAME);
+        log.info("dropSlotIfExist, slot exist, slotName={}", slotName);
         String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
         try (PreparedStatement preparedStatement = connection.prepareStatement(dropSlotSQL)) {
-            preparedStatement.setString(1, getUniqueSlotName(connection));
+            preparedStatement.setString(1, slotName);
             preparedStatement.execute();
         }
     }
@@ -127,7 +127,7 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
      * @throws SQLException failed when getCatalog
      */
     public static String getUniqueSlotName(final Connection connection) throws SQLException {
-        return String.format("%s_%s", SLOT_NAME, connection.getCatalog());
+        return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog());
     }
     
     @Override