You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/27 04:12:07 UTC
[shardingsphere] branch master updated: Refactor PipelineSQLBuilder as SPI (#15094)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8759591 Refactor PipelineSQLBuilder as SPI (#15094)
8759591 is described below
commit 8759591d7655bc84d10050176c3e54122e16a8f3
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 27 12:11:10 2022 +0800
Refactor PipelineSQLBuilder as SPI (#15094)
* Move and rename PipelineSQLBuilderFactory
* Refactor PipelineSQLBuilder as SPI
* Unit test
* Rename PipelineSQLBuilderFactory newInstance to getSQLBuilder
---
.../consistency/DataConsistencyCheckerImpl.java | 4 ++--
.../DataMatchSingleTableDataCalculator.java | 4 ++--
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 11 ++++++++--
.../sqlbuilder/PipelineSQLBuilderFactory.java} | 24 +++++++++-------------
.../rulealtered/prepare/InventoryTaskSplitter.java | 4 ++--
.../job/environment/ScalingEnvironmentManager.java | 4 ++--
.../scaling/core/spi/ScalingEntry.java | 8 --------
.../data/pipeline/mysql/MySQLScalingEntry.java | 6 ------
.../CRC32MatchMySQLSingleTableDataCalculator.java | 7 ++++---
.../check/datasource/MySQLDataSourceChecker.java | 7 ++++---
.../datasource/MySQLDataSourcePreparer.java | 6 +++---
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 8 ++++++++
...ipeline.mysql.ingest.column.value.ValueHandler} | 0
...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} | 6 +-----
.../pipeline/opengauss/OpenGaussScalingEntry.java | 6 ------
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 10 ++++++++-
...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} | 6 +-----
.../postgresql/PostgreSQLScalingEntry.java | 6 ------
.../datasource/PostgreSQLDataSourceChecker.java | 5 ++---
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 8 ++++++++
...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} | 6 +-----
.../PostgreSQLPipelineSQLBuilderTest.java | 5 ++---
.../spi/sqlbuilder/PipelineSQLBuilder.java | 15 +++++++++-----
.../core/fixture/FixturePipelineSQLBuilder.java | 8 ++++++++
.../pipeline/core/fixture/FixtureScalingEntry.java | 6 ------
.../spi/sqlbuilder/PipelineSQLBuilderTest.java | 2 +-
...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder | 6 +-----
27 files changed, 90 insertions(+), 98 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index abe863f..9a5693c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
@@ -42,7 +43,6 @@ import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFact
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -132,7 +132,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
private long count(final DataSource dataSource, final String table, final DatabaseType databaseType) {
try (Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(ScalingSQLBuilderFactory.newInstance(databaseType.getName()).buildCountSQL(table));
+ PreparedStatement preparedStatement = connection.prepareStatement(PipelineSQLBuilderFactory.getSQLBuilder(databaseType.getName()).buildCountSQL(table));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
index 8c810f5..e34bad5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -27,9 +27,9 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -82,7 +82,7 @@ public final class DataMatchSingleTableDataCalculator extends AbstractStreamingS
@Override
protected Optional<Object> calculateChunk(final DataCalculateParameter dataCalculateParameter) {
String logicTableName = dataCalculateParameter.getLogicTableName();
- PipelineSQLBuilder sqlBuilder = ScalingSQLBuilderFactory.newInstance(dataCalculateParameter.getDatabaseType());
+ PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getSQLBuilder(dataCalculateParameter.getDatabaseType());
String uniqueKey = dataCalculateParameter.getUniqueKey();
CalculatedResult previousCalculatedResult = (CalculatedResult) dataCalculateParameter.getPreviousCalculatedResult();
Number startUniqueKeyValue = null != previousCalculatedResult ? previousCalculatedResult.getMaxUniqueKeyValue() : -1;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index af4957f..35f230d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -28,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap;
/**
* Abstract pipeline SQL builder.
*/
-@RequiredArgsConstructor
public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -51,6 +50,14 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
@Getter(AccessLevel.PROTECTED)
private final Map<String, Set<String>> shardingColumnsMap;
+ public AbstractPipelineSQLBuilder() {
+ shardingColumnsMap = Collections.emptyMap();
+ }
+
+ public AbstractPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+ this.shardingColumnsMap = shardingColumnsMap;
+ }
+
/**
* Get left identifier quote string.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/sqlbuilder/ScalingSQLBuilderFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
similarity index 60%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/sqlbuilder/ScalingSQLBuilderFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
index fb7796e..e3d9494 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/sqlbuilder/ScalingSQLBuilderFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
@@ -15,33 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.shardingsphere.spi.exception.ServiceProviderNotFoundException;
+import org.apache.shardingsphere.spi.singleton.TypedSingletonSPIHolder;
/**
- * Scaling SQL builder factory.
+ * Pipeline SQL builder factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ScalingSQLBuilderFactory {
+public final class PipelineSQLBuilderFactory {
+
+ private static final TypedSingletonSPIHolder<PipelineSQLBuilder> SQL_BUILDER_SPI_HOLDER = new TypedSingletonSPIHolder<>(PipelineSQLBuilder.class, false);
/**
- * New instance of SQL builder.
+ * Get SQL builder instance.
*
* @param databaseType database type
* @return SQL builder
*/
- @SneakyThrows(ReflectiveOperationException.class)
- public static PipelineSQLBuilder newInstance(final String databaseType) {
- ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType);
- return scalingEntry.getSQLBuilderClass().getConstructor(Map.class).newInstance(new HashMap<>());
+ public static PipelineSQLBuilder getSQLBuilder(final String databaseType) {
+ return SQL_BUILDER_SPI_HOLDER.get(databaseType).orElseThrow(() -> new ServiceProviderNotFoundException(PipelineSQLBuilder.class, databaseType));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 31500fa..5a4c81a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineMetaDataManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
@@ -41,7 +42,6 @@ import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -180,7 +180,7 @@ public final class InventoryTaskSplitter {
private Collection<IngestPosition<?>> getPositionByPrimaryKeyRange(final RuleAlteredJobContext jobContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new ArrayList<>();
JobConfiguration jobConfig = jobContext.getJobConfig();
- String sql = ScalingSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getSourceDatabaseType())
+ String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobConfig.getHandleConfig().getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index b923d09..70d8d14 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -21,8 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -49,7 +49,7 @@ public final class ScalingEnvironmentManager {
try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
- String sql = ScalingSQLBuilderFactory.newInstance(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+ String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 8474eb2..06b7d9f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeAwareSPI;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
@@ -65,11 +64,4 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
* @return environment checker type
*/
Class<? extends EnvironmentChecker> getEnvironmentCheckerClass();
-
- /**
- * Get SQL builder class.
- *
- * @return SQL builder type
- */
- Class<? extends PipelineSQLBuilder> getSQLBuilderClass();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
index c4302bb..6bb7609 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.mysql.importer.MySQLImporter;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLPositionInitializer;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
/**
@@ -55,11 +54,6 @@ public final class MySQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<MySQLPipelineSQLBuilder> getSQLBuilderClass() {
- return MySQLPipelineSQLBuilder.class;
- }
-
- @Override
public String getDatabaseType() {
return "MySQL";
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableData [...]
index 1d20de5..eaad0c9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalcula
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
import org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -31,7 +32,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -42,6 +42,8 @@ public final class CRC32MatchMySQLSingleTableDataCalculator extends AbstractSing
private static final Collection<String> DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getName());
+ private static final MySQLPipelineSQLBuilder SQL_BUILDER = (MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
+
@Override
public String getAlgorithmType() {
return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
@@ -55,9 +57,8 @@ public final class CRC32MatchMySQLSingleTableDataCalculator extends AbstractSing
@Override
public Iterable<Object> calculate(final DataCalculateParameter dataCalculateParameter) {
String logicTableName = dataCalculateParameter.getLogicTableName();
- MySQLPipelineSQLBuilder scalingSQLBuilder = new MySQLPipelineSQLBuilder(new HashMap<>());
List<Long> result = dataCalculateParameter.getColumnNames().stream().map(each -> {
- String sql = scalingSQLBuilder.buildSumCrc32SQL(logicTableName, each);
+ String sql = SQL_BUILDER.buildSumCrc32SQL(logicTableName, each);
try {
return sumCrc32(dataCalculateParameter.getDataSource(), sql);
} catch (final SQLException ex) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
index 1be1c85..2316a1c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
@@ -19,7 +19,8 @@ package org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
import org.apache.shardingsphere.data.pipeline.core.check.datasource.AbstractDataSourceChecker;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -113,7 +114,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
}
@Override
- protected MySQLPipelineSQLBuilder getSQLBuilder() {
- return new MySQLPipelineSQLBuilder(new HashMap<>());
+ protected PipelineSQLBuilder getSQLBuilder() {
+ return PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 43b13be..20667db 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
import java.sql.Connection;
@@ -31,7 +32,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
-import java.util.Collections;
import java.util.stream.Collectors;
/**
@@ -40,7 +40,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
- private final MySQLPipelineSQLBuilder scalingSQLBuilder = new MySQLPipelineSQLBuilder(Collections.emptyMap());
+ private static final MySQLPipelineSQLBuilder SQL_BUILDER = (MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
@@ -62,7 +62,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
}
private String getCreateTableSQL(final Connection sourceConnection, final String logicTableName) throws SQLException {
- String showCreateTableSQL = "SHOW CREATE TABLE " + scalingSQLBuilder.quote(logicTableName);
+ String showCreateTableSQL = "SHOW CREATE TABLE " + SQL_BUILDER.quote(logicTableName);
try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
if (!resultSet.next()) {
throw new PipelineJobPrepareFailedException("show create table has no result, sql: " + showCreateTableSQL);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 3a33557..80d2272 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -29,6 +29,9 @@ import java.util.Set;
*/
public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+ public MySQLPipelineSQLBuilder() {
+ }
+
public MySQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@@ -77,4 +80,9 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
public String buildSumCrc32SQL(final String tableName, final String column) {
return String.format("SELECT SUM(CRC32(%s)) AS checksum FROM %s", quote(column), quote(tableName));
}
+
+ @Override
+ public String getType() {
+ return "MySQL";
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.mys [...]
similarity index 100%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi [...]
similarity index 63%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..8fae833 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
index e7ce005..76eab6e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.opengauss;
import org.apache.shardingsphere.data.pipeline.opengauss.importer.OpenGaussImporter;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussPositionInitializer;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -55,11 +54,6 @@ public final class OpenGaussScalingEntry implements ScalingEntry {
}
@Override
- public Class<OpenGaussPipelineSQLBuilder> getSQLBuilderClass() {
- return OpenGaussPipelineSQLBuilder.class;
- }
-
- @Override
public String getDatabaseType() {
return "openGauss";
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 95f333c..1989ff9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -32,7 +32,10 @@ import java.util.Set;
* OpenGauss pipeline SQL builder.
*/
public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
-
+
+ public OpenGaussPipelineSQLBuilder() {
+ }
+
public OpenGaussPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@@ -68,4 +71,9 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
// there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
return "";
}
+
+ @Override
+ public String getType() {
+ return "openGauss";
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline [...]
similarity index 63%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..3698bcc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
index c609192..2467f57 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.importer.PostgreSQLImp
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLPositionInitializer;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
/**
@@ -55,11 +54,6 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<PostgreSQLPipelineSQLBuilder> getSQLBuilderClass() {
- return PostgreSQLPipelineSQLBuilder.class;
- }
-
- @Override
public String getDatabaseType() {
return "PostgreSQL";
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index b015728..beb09f6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.check.datasource;
import org.apache.shardingsphere.data.pipeline.core.check.datasource.AbstractDataSourceChecker;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import javax.sql.DataSource;
@@ -28,7 +28,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.HashMap;
/**
* PostgreSQL Data source checker.
@@ -68,6 +67,6 @@ public final class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker
@Override
protected PipelineSQLBuilder getSQLBuilder() {
- return new PostgreSQLPipelineSQLBuilder(new HashMap<>());
+ return PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL");
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 29715bd..c4d0851 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -30,6 +30,9 @@ import java.util.Set;
*/
public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+ public PostgreSQLPipelineSQLBuilder() {
+ }
+
public PostgreSQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@@ -58,4 +61,9 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
result.append(") DO NOTHING");
return result.toString();
}
+
+ @Override
+ public String getType() {
+ return "PostgreSQL";
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipelin [...]
similarity index 63%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..fe4298e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index e875008..5c01ac5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -19,13 +19,12 @@ package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
-import java.util.Collections;
-
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -33,7 +32,7 @@ public final class PostgreSQLPipelineSQLBuilderTest {
@Test
public void assertBuildInsertSQL() {
- String actual = new PostgreSQLPipelineSQLBuilder(Collections.emptyMap()).buildInsertSQL(mockDataRecord());
+ String actual = PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL").buildInsertSQL(mockDataRecord());
assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index b889b94..13a6f1a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -19,17 +19,21 @@ package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
+import org.apache.shardingsphere.spi.typed.TypedSPI;
import java.util.Collection;
import java.util.List;
/**
* Pipeline SQL builder.
+ * It's singleton when it's used as SPI, else not.
*/
-public interface PipelineSQLBuilder {
+public interface PipelineSQLBuilder extends TypedSPI, SingletonSPI {
/**
* Build insert SQL.
+ * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
*
* @param dataRecord data record
* @return insert SQL
@@ -46,11 +50,12 @@ public interface PipelineSQLBuilder {
String buildUpdateSQL(DataRecord dataRecord, Collection<Column> conditionColumns);
/**
- * Extract need updated columns.
+ * Extract updated columns.
+ * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
*
- * @param columns the input columns
- * @param record the input datarecord
- * @return the filtered columns.
+ * @param columns columns
+ * @param record data record
+ * @return filtered columns
*/
List<Column> extractUpdatedColumns(Collection<Column> columns, DataRecord record);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 990d834..70fd920 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -24,6 +24,9 @@ import java.util.Set;
public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
+ public FixturePipelineSQLBuilder() {
+ }
+
public FixturePipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@@ -37,4 +40,9 @@ public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder
protected String getRightIdentifierQuoteString() {
return "`";
}
+
+ @Override
+ public String getType() {
+ return "H2";
+ }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java
index f8de7d3..454616e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -53,11 +52,6 @@ public final class FixtureScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PipelineSQLBuilder> getSQLBuilderClass() {
- return FixturePipelineSQLBuilder.class;
- }
-
- @Override
public String getDatabaseType() {
return "H2";
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index 86ab11f..e93f4d2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertThat;
public final class PipelineSQLBuilderTest {
- private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder(Collections.emptyMap());
+ private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
@Test
public void assertBuildInsertSQL() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
similarity index 63%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..3d09e4d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineSQLBuilder