You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/13 09:45:18 UTC
[shardingsphere] branch master updated: Refactor PostgreSQLPositionInitializer code style (#16786)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 05dc0a81390 Refactor PostgreSQLPositionInitializer code style (#16786)
05dc0a81390 is described below
commit 05dc0a81390fe27ea9c11130d6ead152c9d1cee6
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Apr 13 17:45:13 2022 +0800
Refactor PostgreSQLPositionInitializer code style (#16786)
---
.../ingest/PostgreSQLPositionInitializer.java | 54 +++++++++++-----------
1 file changed, 27 insertions(+), 27 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 14b2bfa838c..606198c049e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.postgresql.replication.LogSequenceNumber;
-import org.postgresql.util.PSQLException;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -36,17 +35,16 @@ import java.sql.SQLException;
@Slf4j
public final class PostgreSQLPositionInitializer implements PositionInitializer {
- // TODO it should be private; _PREFIX;
- public static final String SLOT_NAME = "sharding_scaling";
+ private static final String SLOT_NAME_PREFIX = "sharding_scaling";
- public static final String DECODE_PLUGIN = "test_decoding";
+ private static final String DECODE_PLUGIN = "test_decoding";
- public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+ private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
@Override
public WalPosition init(final DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- createIfNotExists(connection);
+ createSlotIfNotExist(connection, getUniqueSlotName(connection));
return getWalPosition(connection);
}
}
@@ -56,24 +54,25 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
}
- private void createIfNotExists(final Connection connection) throws SQLException {
- if (checkSlotExistsOrNot(connection)) {
- log.info("replication slot already exist, slot name: {}", SLOT_NAME);
+ private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
+ if (isSlotExisting(connection, slotName)) {
+ log.info("createSlotIfNotExist, slot exist, slotName={}", slotName);
return;
}
- try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN))) {
- ps.execute();
- } catch (final PSQLException ex) {
+ String createSlotSQL = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
+ try (PreparedStatement preparedStatement = connection.prepareStatement(createSlotSQL)) {
+ preparedStatement.execute();
+ } catch (final SQLException ex) {
if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
throw ex;
}
}
}
- private boolean checkSlotExistsOrNot(final Connection connection) throws SQLException {
+ private boolean isSlotExisting(final Connection connection, final String slotName) throws SQLException {
String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
try (PreparedStatement preparedStatement = connection.prepareStatement(checkSlotSQL)) {
- preparedStatement.setString(1, getUniqueSlotName(connection));
+ preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
return resultSet.next();
@@ -82,39 +81,40 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
private WalPosition getWalPosition(final Connection connection) throws SQLException {
- try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
- ResultSet rs = ps.executeQuery()) {
- rs.next();
- return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(rs.getString(1))));
+ try (PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
}
- private String getSql(final Connection connection) throws SQLException {
+ private String getLogSequenceNumberSQL(final Connection connection) throws SQLException {
if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
return "SELECT PG_CURRENT_XLOG_LOCATION()";
}
if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
return "SELECT PG_CURRENT_WAL_LSN()";
}
- throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
+ throw new RuntimeException("Unsupported PostgreSQL version: " + connection.getMetaData().getDatabaseProductVersion());
}
@Override
public void destroy(final DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExists(connection);
+ dropSlotIfExist(connection);
}
}
- private void dropSlotIfExists(final Connection connection) throws SQLException {
- if (!checkSlotExistsOrNot(connection)) {
- log.info("drop, slot not exist, slot name: {}", SLOT_NAME);
+ private void dropSlotIfExist(final Connection connection) throws SQLException {
+ String slotName = getUniqueSlotName(connection);
+ if (!isSlotExisting(connection, slotName)) {
+ log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
return;
}
- log.info("drop, slot exist, slot name: {}", SLOT_NAME);
+ log.info("dropSlotIfExist, slot exist, slotName={}", slotName);
String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
try (PreparedStatement preparedStatement = connection.prepareStatement(dropSlotSQL)) {
- preparedStatement.setString(1, getUniqueSlotName(connection));
+ preparedStatement.setString(1, slotName);
preparedStatement.execute();
}
}
@@ -127,7 +127,7 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection) throws SQLException {
- return String.format("%s_%s", SLOT_NAME, connection.getCatalog());
+ return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog());
}
@Override