You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/12 12:37:37 UTC
[shardingsphere] branch master updated: Fix openGauss logical replication slot creation (#16773)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5b117204690 Fix openGauss logical replication slot creation (#16773)
5b117204690 is described below
commit 5b1172046909b8323c04d99d927ef7f8b5a2853d
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Apr 12 20:37:24 2022 +0800
Fix openGauss logical replication slot creation (#16773)
---
.../ingest/OpenGaussPositionInitializer.java | 73 ++++++++++++++++++++-
.../opengauss/ingest/OpenGaussWalDumper.java | 46 +++++---------
.../ingest/wal/OpenGaussLogicalReplication.java | 74 ----------------------
.../postgresql/ingest/PostgreSQLWalDumper.java | 1 +
.../postgresql/ingest/wal/LogicalReplication.java | 1 +
5 files changed, 88 insertions(+), 107 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 12d2aa93e76..086dd9e7def 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -17,13 +17,14 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
-import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.opengauss.replication.LogSequenceNumber;
import javax.sql.DataSource;
+import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -32,12 +33,20 @@ import java.sql.SQLException;
/**
* OpenGauss wal position initializer.
*/
+// TODO reuse PostgreSQLPositionInitializer
+@Slf4j
public final class OpenGaussPositionInitializer implements PositionInitializer {
+ private static final String SLOT_NAME_PREFIX = "sharding_scaling";
+
+ private static final String DECODE_PLUGIN = "mppdb_decoding";
+
+ private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+
@Override
public WalPosition init(final DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- // TODO createSlotIfNotExists
+ createSlotIfNotExist(connection);
return getWalPosition(connection);
}
}
@@ -47,6 +56,41 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
}
+ /**
+ * Create logical replication slot if it does not exist.
+ *
+ * @param connection connection
+ * @throws SQLException SQL exception
+ */
+ private void createSlotIfNotExist(final Connection connection) throws SQLException {
+ String slotName = getUniqueSlotName(connection);
+ if (!isSlotExist(connection, slotName)) {
+ createSlotBySQL(connection);
+ }
+ }
+
+ private boolean isSlotExist(final Connection connection, final String slotName) throws SQLException {
+ String sql = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.setString(2, DECODE_PLUGIN);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.next();
+ }
+ }
+ }
+
+ private void createSlotBySQL(final Connection connection) throws SQLException {
+ String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN);
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ preparedStatement.execute();
+ } catch (final SQLException ex) {
+ if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
+ throw ex;
+ }
+ }
+ }
+
private WalPosition getWalPosition(final Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -58,10 +102,33 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
@Override
public void destroy(final DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- OpenGaussLogicalReplication.dropSlot(connection);
+ dropSlotIfExist(connection);
}
}
+ private void dropSlotIfExist(final Connection connection) throws SQLException {
+ String slotName = getUniqueSlotName(connection);
+ if (!isSlotExist(connection, slotName)) {
+ log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
+ return;
+ }
+ String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
+ try (CallableStatement callableStatement = connection.prepareCall(sql)) {
+ callableStatement.execute();
+ }
+ }
+
+ /**
+ * Get the unique slot name by connection.
+ *
+ * @param connection connection
+ * @return the unique name by connection
+ * @throws SQLException failed when getCatalog
+ */
+ public static String getUniqueSlotName(final Connection connection) throws SQLException {
+ return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog());
+ }
+
@Override
public String getType() {
return "openGauss";
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index d6e7b2721f2..e5ee5899712 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -39,9 +38,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Abstr
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.PGReplicationStream;
-import javax.sql.DataSource;
import java.nio.ByteBuffer;
-import java.sql.Connection;
import java.sql.SQLException;
/**
@@ -58,8 +55,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
private final WalEventConverter walEventConverter;
- private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;
-
private final PipelineChannel channel;
public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
@@ -79,29 +74,11 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
dump();
}
- private PgConnection getReplicationConn() throws SQLException {
- return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
- }
-
- private MppdbDecodingPlugin initReplication() {
- try {
- DataSource pipelineDataSource = PipelineDataSourceCreatorFactory.getInstance(
- dumperConfig.getDataSourceConfig().getType()).createPipelineDataSource(dumperConfig.getDataSourceConfig().getDataSourceConfiguration());
- try (Connection connection = pipelineDataSource.getConnection()) {
- slotName = OpenGaussLogicalReplication.getUniqueSlotName(connection);
- OpenGaussLogicalReplication.createIfNotExists(connection);
- return new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils()));
- }
- } catch (final SQLException ex) {
- log.warn("Create replication slot failed!");
- }
- return null;
- }
-
private void dump() {
- DecodingPlugin decodingPlugin = initReplication();
- try (PgConnection connection = getReplicationConn()) {
- PGReplicationStream stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), slotName);
+ PGReplicationStream stream = null;
+ try (PgConnection connection = getReplicationConnectionUnwrap()) {
+ stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection));
+ DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()));
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
@@ -113,13 +90,22 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
pushRecord(record);
}
} catch (final SQLException ex) {
- if (ex.getMessage().contains("is already active")) {
- return;
- }
throw new IngestException(ex);
+ } finally {
+ if (null != stream) {
+ try {
+ stream.close();
+ } catch (final SQLException ex) {
+ log.error("Close PGReplicationStream failed", ex);
+ }
+ }
}
}
+ private PgConnection getReplicationConnectionUnwrap() throws SQLException {
+ return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
+ }
+
private void pushRecord(final Record record) {
channel.pushRecord(record);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index 9f20968ad19..419b048a683 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -25,13 +25,9 @@ import org.opengauss.PGProperty;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.replication.PGReplicationStream;
-import org.opengauss.util.PSQLException;
-import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
@@ -41,13 +37,6 @@ import java.util.Properties;
@Slf4j
public final class OpenGaussLogicalReplication {
- // TODO it should be private
- public static final String SLOT_NAME_PREFIX = "sharding_scaling";
-
- public static final String DECODE_PLUGIN = "mppdb_decoding";
-
- public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
-
/**
* Create connection.
*
@@ -85,67 +74,4 @@ public final class OpenGaussLogicalReplication {
.withStartPosition((LogSequenceNumber) startPosition.get())
.start();
}
-
- /**
- * Create slots (drop existed slot before create).
- *
- * @param connection connection
- * @throws SQLException SQL exception
- */
- public static void createIfNotExists(final Connection connection) throws SQLException {
- String slotName = getUniqueSlotName(connection);
- if (!isSlotNameExist(connection, slotName)) {
- createSlotBySQL(connection);
- }
- }
-
- /**
- * Drop replication slot by connection.
- *
- * @param connection connection
- * @throws SQLException drop SQL with error
- */
- public static void dropSlot(final Connection connection) throws SQLException {
- String slotName = getUniqueSlotName(connection);
- if (!isSlotNameExist(connection, slotName)) {
- log.info("dropSlot, slot not exist, ignore, slotName={}", slotName);
- return;
- }
- String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
- try (CallableStatement callableStatement = connection.prepareCall(sql)) {
- callableStatement.execute();
- }
- }
-
- private static boolean isSlotNameExist(final Connection connection, final String slotName) throws SQLException {
- String sql = "select * from pg_replication_slots where slot_name=?";
- try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, slotName);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- return resultSet.next();
- }
- }
- }
-
- private static void createSlotBySQL(final Connection connection) throws SQLException {
- String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN);
- try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.execute();
- } catch (final PSQLException ex) {
- if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
- throw ex;
- }
- }
- }
-
- /**
- * Get the unique slot name by connection.
- *
- * @param connection connection
- * @return the unique name by connection
- * @throws SQLException failed when getCatalog
- */
- public static String getUniqueSlotName(final Connection connection) throws SQLException {
- return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog());
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index b725bf2cb2d..0f65746e28c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -76,6 +76,7 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
}
private void dump() {
+ // TODO use unified PgConnection
try (Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection), walPosition.getLogSequenceNumber())) {
PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
index 654bc7afb74..810549f9aa3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
@@ -33,6 +33,7 @@ import java.util.Properties;
/**
* PostgreSQL logical replication.
*/
+// TODO add prefix PostgreSQL
public final class LogicalReplication {
/**