You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/01 09:30:01 UTC
[shardingsphere] branch master updated: Fix replication slot not remove when drop database at openGauss (#25983)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5cf4ae2f0da Fix replication slot not remove when drop database at openGauss (#25983)
5cf4ae2f0da is described below
commit 5cf4ae2f0daee22e746261242b10adb8e26525c5
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Jun 1 17:29:54 2023 +0800
Fix replication slot not remove when drop database at openGauss (#25983)
* Fix slot not remove when drop database at openGauss
* Reduced data replay after reconnect
* Make sure stop job before drop
* Use AtomicReference
---
.../ingest/OpenGaussPositionInitializer.java | 26 +++++++++++------
.../opengauss/ingest/OpenGaussWALDumper.java | 9 ++++--
.../postgresql/ingest/PostgreSQLWALDumper.java | 8 ++++--
.../ingest/pojo/ReplicationSlotInfo.java | 33 ++++++++++++++++++++++
.../migration/api/impl/MigrationJobAPI.java | 2 +-
5 files changed, 63 insertions(+), 15 deletions(-)
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 8e618a48929..396722399d0 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.opengauss.replication.LogSequenceNumber;
@@ -30,6 +31,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Optional;
/**
* OpenGauss WAL position initializer.
@@ -66,18 +68,27 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
*/
private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
String slotName = getUniqueSlotName(connection, slotNameSuffix);
- if (!isSlotExist(connection, slotName)) {
+ Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, slotName);
+ if (!slotInfo.isPresent()) {
+ createSlotBySQL(connection, slotName);
+ return;
+ }
+ if (null == slotInfo.get().getDatabaseName()) {
+ dropSlotIfExist(connection, slotName);
createSlotBySQL(connection, slotName);
}
}
- private boolean isSlotExist(final Connection connection, final String slotName) throws SQLException {
- String sql = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
+ private Optional<ReplicationSlotInfo> getSlotInfo(final Connection connection, final String slotName) throws SQLException {
+ String sql = "SELECT slot_name, database FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
- return resultSet.next();
+ if (!resultSet.next()) {
+ return Optional.empty();
+ }
+ return Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
}
}
}
@@ -105,13 +116,12 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
@Override
public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExist(connection, slotNameSuffix);
+ dropSlotIfExist(connection, getUniqueSlotName(connection, slotNameSuffix));
}
}
- private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
- String slotName = getUniqueSlotName(connection, slotNameSuffix);
- if (!isSlotExist(connection, slotName)) {
+ private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException {
+ if (!getSlotInfo(connection, slotName).isPresent()) {
log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
return;
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 5fa19fe7c10..362c79d12ca 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -50,6 +50,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* WAL dumper of openGauss.
@@ -59,7 +60,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
private final DumperConfiguration dumperConfig;
- private final WALPosition walPosition;
+ private final AtomicReference<WALPosition> walPosition;
private final PipelineChannel channel;
@@ -76,7 +77,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
- walPosition = (WALPosition) position;
+ walPosition = new AtomicReference<>((WALPosition) position);
this.channel = channel;
walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
logicalReplication = new OpenGaussLogicalReplication();
@@ -108,7 +109,8 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
private void dump() throws SQLException {
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
- stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
+ stream = logicalReplication.createReplicationStream(connection, walPosition.get().getLogSequenceNumber(),
+ OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
while (isRunning()) {
ByteBuffer message = stream.readPending();
@@ -122,6 +124,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
} else {
processEventIgnoreTX(event);
}
+ walPosition.set(new WALPosition(event.getLogSequenceNumber()));
}
} finally {
if (null != stream) {
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 8f19265d9b0..4c710b64740 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* PostgreSQL WAL dumper.
@@ -61,7 +62,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
private final DumperConfiguration dumperConfig;
- private final WALPosition walPosition;
+ private final AtomicReference<WALPosition> walPosition;
private final PipelineChannel channel;
@@ -78,7 +79,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
- walPosition = (WALPosition) position;
+ walPosition = new AtomicReference<>((WALPosition) position);
this.channel = channel;
walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
logicalReplication = new PostgreSQLLogicalReplication();
@@ -112,7 +113,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
try (
Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()),
- walPosition.getLogSequenceNumber())) {
+ walPosition.get().getLogSequenceNumber())) {
PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
while (isRunning()) {
@@ -127,6 +128,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
} else {
processEventIgnoreTX(event);
}
+ walPosition.set(new WALPosition(event.getLogSequenceNumber()));
}
}
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/pojo/ReplicationSlotInfo.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/pojo/ReplicationSlotInfo.java
new file mode 100644
index 00000000000..3b14d003269
--- /dev/null
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/pojo/ReplicationSlotInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Replication slot info.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ReplicationSlotInfo {
+
+ private final String slotName;
+
+ private final String databaseName;
+}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 8776d101328..c22b44b41b5 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -375,8 +375,8 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
@Override
public void rollback(final String jobId) throws SQLException {
final long startTimeMillis = System.currentTimeMillis();
- dropCheckJobs(jobId);
stop(jobId);
+ dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
dropJob(jobId);
log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);