You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/03 12:49:38 UTC
[shardingsphere] branch master updated: Refactor AbstractIncrementalDumper (#21329)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c07a25c1d17 Refactor AbstractIncrementalDumper (#21329)
c07a25c1d17 is described below
commit c07a25c1d1796c1cc2e57d5bdd155d9cf4fcb107
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Oct 3 20:49:29 2022 +0800
Refactor AbstractIncrementalDumper (#21329)
* Refactor AbstractIncrementalDumper
* Refactor AbstractDataSourcePreparer
---
.../prepare/datasource/AbstractDataSourcePreparer.java | 5 +++--
.../pipeline/mysql/ingest/MySQLIncrementalDumper.java | 13 +++++--------
.../mysql/ingest/MySQLIncrementalDumperTest.java | 4 ++--
.../pipeline/opengauss/ingest/OpenGaussWalDumper.java | 17 +++++++++--------
.../pipeline/postgresql/ingest/PostgreSQLWalDumper.java | 17 +++++++++--------
5 files changed, 28 insertions(+), 28 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 32baf21e13d..13a134fcd9b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -52,7 +52,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
@Override
- public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) throws SQLException {
+ public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) {
DatabaseType targetDatabaseType = parameter.getTargetDatabaseType();
if (!targetDatabaseType.isSchemaAvailable()) {
log.info("prepareTargetSchemas, target database does not support schema, ignore, targetDatabaseType={}", targetDatabaseType);
@@ -76,12 +76,13 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
}
- private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
+ private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) {
log.info("prepareTargetSchemas, sql={}", sql);
try (Connection connection = getCachedDataSource(dataSourceManager, targetDataSourceConfig).getConnection()) {
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
}
+ } catch (final SQLException ignored) {
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 4e896f3c504..9d3cee95a05 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -54,7 +54,6 @@ import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Objects;
import java.util.Optional;
-import java.util.Random;
/**
* MySQL incremental dumper.
@@ -62,13 +61,11 @@ import java.util.Random;
@Slf4j
public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<BinlogPosition> {
- private final BinlogPosition binlogPosition;
-
private final DumperConfiguration dumperConfig;
- private final PipelineTableMetaDataLoader metaDataLoader;
+ private final BinlogPosition binlogPosition;
- private final Random random = new SecureRandom();
+ private final PipelineTableMetaDataLoader metaDataLoader;
private final PipelineChannel channel;
@@ -79,15 +76,15 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
super(dumperConfig, binlogPosition, channel, metaDataLoader);
- this.binlogPosition = (BinlogPosition) binlogPosition;
- this.dumperConfig = dumperConfig;
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
+ this.dumperConfig = dumperConfig;
+ this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
DataSourceMetaData metaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
- client = new MySQLClient(new ConnectInfo(random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
+ client = new MySQLClient(new ConnectInfo(new SecureRandom().nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
catalog = metaData.getCatalog();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 63813bd2520..a2fadd35565 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -68,12 +68,12 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLIncrementalDumperTest {
+ private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
+
private MySQLIncrementalDumper incrementalDumper;
private MultiplexMemoryPipelineChannel channel;
- private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
-
@Mock
private PipelineTableMetaData pipelineTableMetaData;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 55abb75930a..7660c49d3ca 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -35,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventCon
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.PGReplicationStream;
@@ -48,26 +49,26 @@ import java.sql.SQLException;
@Slf4j
public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosition> {
- private final WalPosition walPosition;
-
private final DumperConfiguration dumperConfig;
- private final OpenGaussLogicalReplication logicalReplication = new OpenGaussLogicalReplication();
+ private final WalPosition walPosition;
+
+ private final PipelineChannel channel;
private final WalEventConverter walEventConverter;
- private final PipelineChannel channel;
+ private final OpenGaussLogicalReplication logicalReplication;
public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
super(dumperConfig, position, channel, metaDataLoader);
- walPosition = (WalPosition) position;
- if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
- throw new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
- }
+ ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+ () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
+ walPosition = (WalPosition) position;
this.channel = channel;
walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
+ logicalReplication = new OpenGaussLogicalReplication();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index d5f3ef5ae11..ab97b500973 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -35,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Post
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;
@@ -49,26 +50,26 @@ import java.sql.SQLException;
@Slf4j
public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosition> {
- private final WalPosition walPosition;
-
private final DumperConfiguration dumperConfig;
- private final LogicalReplication logicalReplication = new LogicalReplication();
+ private final WalPosition walPosition;
+
+ private final PipelineChannel channel;
private final WalEventConverter walEventConverter;
- private final PipelineChannel channel;
+ private final LogicalReplication logicalReplication;
public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
super(dumperConfig, position, channel, metaDataLoader);
- walPosition = (WalPosition) position;
- if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
- throw new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
- }
+ ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+ () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
+ walPosition = (WalPosition) position;
this.channel = channel;
walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
+ logicalReplication = new LogicalReplication();
}
@Override