You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/01 09:30:01 UTC

[shardingsphere] branch master updated: Fix replication slot not remove when drop database at openGauss (#25983)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cf4ae2f0da Fix replication slot not remove when drop database at openGauss (#25983)
5cf4ae2f0da is described below

commit 5cf4ae2f0daee22e746261242b10adb8e26525c5
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Jun 1 17:29:54 2023 +0800

    Fix replication slot not remove when drop database at openGauss (#25983)
    
    * Fix slot not remove when drop database at openGauss
    
    * Reduced data replay after reconnect
    
    * Make sure stop job before drop
    
    * Use AtomicReference
---
 .../ingest/OpenGaussPositionInitializer.java       | 26 +++++++++++------
 .../opengauss/ingest/OpenGaussWALDumper.java       |  9 ++++--
 .../postgresql/ingest/PostgreSQLWALDumper.java     |  8 ++++--
 .../ingest/pojo/ReplicationSlotInfo.java           | 33 ++++++++++++++++++++++
 .../migration/api/impl/MigrationJobAPI.java        |  2 +-
 5 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 8e618a48929..396722399d0 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.opengauss.replication.LogSequenceNumber;
@@ -30,6 +31,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Optional;
 
 /**
  * OpenGauss WAL position initializer.
@@ -66,18 +68,27 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
      */
     private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
         String slotName = getUniqueSlotName(connection, slotNameSuffix);
-        if (!isSlotExist(connection, slotName)) {
+        Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, slotName);
+        if (!slotInfo.isPresent()) {
+            createSlotBySQL(connection, slotName);
+            return;
+        }
+        if (null == slotInfo.get().getDatabaseName()) {
+            dropSlotIfExist(connection, slotName);
             createSlotBySQL(connection, slotName);
         }
     }
     
-    private boolean isSlotExist(final Connection connection, final String slotName) throws SQLException {
-        String sql = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
+    private Optional<ReplicationSlotInfo> getSlotInfo(final Connection connection, final String slotName) throws SQLException {
+        String sql = "SELECT slot_name, database FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
             preparedStatement.setString(1, slotName);
             preparedStatement.setString(2, DECODE_PLUGIN);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                return resultSet.next();
+                if (!resultSet.next()) {
+                    return Optional.empty();
+                }
+                return Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
             }
         }
     }
@@ -105,13 +116,12 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     @Override
     public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExist(connection, slotNameSuffix);
+            dropSlotIfExist(connection, getUniqueSlotName(connection, slotNameSuffix));
         }
     }
     
-    private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
-        String slotName = getUniqueSlotName(connection, slotNameSuffix);
-        if (!isSlotExist(connection, slotName)) {
+    private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException {
+        if (!getSlotInfo(connection, slotName).isPresent()) {
             log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
             return;
         }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 5fa19fe7c10..362c79d12ca 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -50,6 +50,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * WAL dumper of openGauss.
@@ -59,7 +60,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
     
     private final DumperConfiguration dumperConfig;
     
-    private final WALPosition walPosition;
+    private final AtomicReference<WALPosition> walPosition;
     
     private final PipelineChannel channel;
     
@@ -76,7 +77,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
-        walPosition = (WALPosition) position;
+        walPosition = new AtomicReference<>((WALPosition) position);
         this.channel = channel;
         walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
         logicalReplication = new OpenGaussLogicalReplication();
@@ -108,7 +109,8 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
     private void dump() throws SQLException {
         PGReplicationStream stream = null;
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
-            stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
+            stream = logicalReplication.createReplicationStream(connection, walPosition.get().getLogSequenceNumber(),
+                    OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
             DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
@@ -122,6 +124,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
                 } else {
                     processEventIgnoreTX(event);
                 }
+                walPosition.set(new WALPosition(event.getLogSequenceNumber()));
             }
         } finally {
             if (null != stream) {
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 8f19265d9b0..4c710b64740 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -52,6 +52,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * PostgreSQL WAL dumper.
@@ -61,7 +62,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
     
     private final DumperConfiguration dumperConfig;
     
-    private final WALPosition walPosition;
+    private final AtomicReference<WALPosition> walPosition;
     
     private final PipelineChannel channel;
     
@@ -78,7 +79,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
-        walPosition = (WALPosition) position;
+        walPosition = new AtomicReference<>((WALPosition) position);
         this.channel = channel;
         walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
         logicalReplication = new PostgreSQLLogicalReplication();
@@ -112,7 +113,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
         try (
                 Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
                 PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()),
-                        walPosition.getLogSequenceNumber())) {
+                        walPosition.get().getLogSequenceNumber())) {
             PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
             DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
             while (isRunning()) {
@@ -127,6 +128,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
                 } else {
                     processEventIgnoreTX(event);
                 }
+                walPosition.set(new WALPosition(event.getLogSequenceNumber()));
             }
         }
     }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/pojo/ReplicationSlotInfo.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/pojo/ReplicationSlotInfo.java
new file mode 100644
index 00000000000..3b14d003269
--- /dev/null
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/pojo/ReplicationSlotInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Replication slot info.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ReplicationSlotInfo {
+    
+    private final String slotName;
+    
+    private final String databaseName;
+}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 8776d101328..c22b44b41b5 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -375,8 +375,8 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
     @Override
     public void rollback(final String jobId) throws SQLException {
         final long startTimeMillis = System.currentTimeMillis();
-        dropCheckJobs(jobId);
         stop(jobId);
+        dropCheckJobs(jobId);
         cleanTempTableOnRollback(jobId);
         dropJob(jobId);
         log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);