You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/03 12:49:38 UTC

[shardingsphere] branch master updated: Refactor AbstractIncrementalDumper (#21329)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c07a25c1d17 Refactor AbstractIncrementalDumper (#21329)
c07a25c1d17 is described below

commit c07a25c1d1796c1cc2e57d5bdd155d9cf4fcb107
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Oct 3 20:49:29 2022 +0800

    Refactor AbstractIncrementalDumper (#21329)
    
    * Refactor AbstractIncrementalDumper
    
    * Refactor AbstractDataSourcePreparer
---
 .../prepare/datasource/AbstractDataSourcePreparer.java  |  5 +++--
 .../pipeline/mysql/ingest/MySQLIncrementalDumper.java   | 13 +++++--------
 .../mysql/ingest/MySQLIncrementalDumperTest.java        |  4 ++--
 .../pipeline/opengauss/ingest/OpenGaussWalDumper.java   | 17 +++++++++--------
 .../pipeline/postgresql/ingest/PostgreSQLWalDumper.java | 17 +++++++++--------
 5 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 32baf21e13d..13a134fcd9b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -52,7 +52,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
     
     @Override
-    public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) throws SQLException {
+    public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) {
         DatabaseType targetDatabaseType = parameter.getTargetDatabaseType();
         if (!targetDatabaseType.isSchemaAvailable()) {
             log.info("prepareTargetSchemas, target database does not support schema, ignore, targetDatabaseType={}", targetDatabaseType);
@@ -76,12 +76,13 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
     }
     
-    private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
+    private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) {
         log.info("prepareTargetSchemas, sql={}", sql);
         try (Connection connection = getCachedDataSource(dataSourceManager, targetDataSourceConfig).getConnection()) {
             try (Statement statement = connection.createStatement()) {
                 statement.execute(sql);
             }
+        } catch (final SQLException ignored) {
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 4e896f3c504..9d3cee95a05 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -54,7 +54,6 @@ import java.io.Serializable;
 import java.security.SecureRandom;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Random;
 
 /**
  * MySQL incremental dumper.
@@ -62,13 +61,11 @@ import java.util.Random;
 @Slf4j
 public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<BinlogPosition> {
     
-    private final BinlogPosition binlogPosition;
-    
     private final DumperConfiguration dumperConfig;
     
-    private final PipelineTableMetaDataLoader metaDataLoader;
+    private final BinlogPosition binlogPosition;
     
-    private final Random random = new SecureRandom();
+    private final PipelineTableMetaDataLoader metaDataLoader;
     
     private final PipelineChannel channel;
     
@@ -79,15 +76,15 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
                                   final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, binlogPosition, channel, metaDataLoader);
-        this.binlogPosition = (BinlogPosition) binlogPosition;
-        this.dumperConfig = dumperConfig;
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
+        this.dumperConfig = dumperConfig;
+        this.binlogPosition = (BinlogPosition) binlogPosition;
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
         YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
         log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
         DataSourceMetaData metaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
-        client = new MySQLClient(new ConnectInfo(random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
+        client = new MySQLClient(new ConnectInfo(new SecureRandom().nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
         catalog = metaData.getCatalog();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 63813bd2520..a2fadd35565 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -68,12 +68,12 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLIncrementalDumperTest {
     
+    private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
+    
     private MySQLIncrementalDumper incrementalDumper;
     
     private MultiplexMemoryPipelineChannel channel;
     
-    private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
-    
     @Mock
     private PipelineTableMetaData pipelineTableMetaData;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 55abb75930a..7660c49d3ca 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -35,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventCon
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.PGReplicationStream;
@@ -48,26 +49,26 @@ import java.sql.SQLException;
 @Slf4j
 public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosition> {
     
-    private final WalPosition walPosition;
-    
     private final DumperConfiguration dumperConfig;
     
-    private final OpenGaussLogicalReplication logicalReplication = new OpenGaussLogicalReplication();
+    private final WalPosition walPosition;
+    
+    private final PipelineChannel channel;
     
     private final WalEventConverter walEventConverter;
     
-    private final PipelineChannel channel;
+    private final OpenGaussLogicalReplication logicalReplication;
     
     public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
                               final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, position, channel, metaDataLoader);
-        walPosition = (WalPosition) position;
-        if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
-            throw new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
-        }
+        ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+                () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
+        walPosition = (WalPosition) position;
         this.channel = channel;
         walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
+        logicalReplication = new OpenGaussLogicalReplication();
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index d5f3ef5ae11..ab97b500973 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -35,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Post
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.PGReplicationStream;
@@ -49,26 +50,26 @@ import java.sql.SQLException;
 @Slf4j
 public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosition> {
     
-    private final WalPosition walPosition;
-    
     private final DumperConfiguration dumperConfig;
     
-    private final LogicalReplication logicalReplication = new LogicalReplication();
+    private final WalPosition walPosition;
+    
+    private final PipelineChannel channel;
     
     private final WalEventConverter walEventConverter;
     
-    private final PipelineChannel channel;
+    private final LogicalReplication logicalReplication;
     
     public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
                                final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, position, channel, metaDataLoader);
-        walPosition = (WalPosition) position;
-        if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
-            throw new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
-        }
+        ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+                () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
+        walPosition = (WalPosition) position;
         this.channel = channel;
         walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
+        logicalReplication = new LogicalReplication();
     }
     
     @Override