You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/02/21 03:46:44 UTC
[shardingsphere] branch master updated: Support branch database type in pipeline job (#24264)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new beaf2594998 Support branch database type in pipeline job (#24264)
beaf2594998 is described below
commit beaf25949986e9926a2330c888f1f471b0abc5c1
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Feb 21 11:46:34 2023 +0800
Support branch database type in pipeline job (#24264)
* Remove DefaultPipelineSQLBuilder
* Update 01-initdb.sql
* Add PipelineTypedSPILoader
* Replace TypedSPILoader.findService to PipelineTypedSPILoader.findDatabaseTypedService
* Replace TypedSPILoader.getService to PipelineTypedSPILoader.getDatabaseTypedService
* Check sharding rule exists in metadata before migration
* Improve PipelineTypedSPILoader to get default service
* Remove DefaultPositionInitializer
* Revert "Improve PipelineTypedSPILoader to get default service"
This reverts commit cbbbb9dc8133c004c2f44f9d23a4bc20cb706052.
* Refactor DefaultColumnValueReader to BasicColumnValueReader
* Improve PipelineTypedSPILoader
* Compatible branch database in left snippets
* Compatible branch database in unit test
---
...rdingSpherePipelineDataSourceConfiguration.java | 4 +-
.../StandardPipelineDataSourceConfiguration.java | 4 +-
.../datasource/JdbcQueryPropertiesExtension.java | 2 +-
.../pipeline/util/spi/PipelineTypedSPILoader.java | 64 ++++++++++++++++++++++
...RC32MatchDataConsistencyCalculateAlgorithm.java | 9 +--
...DataMatchDataConsistencyCalculateAlgorithm.java | 7 ++-
.../datasource/AbstractDataSourceChecker.java | 4 +-
.../H2JdbcQueryPropertiesExtension.java} | 30 +++++-----
.../pipeline/core/importer/DataSourceImporter.java | 4 +-
...alueReader.java => BasicColumnValueReader.java} | 13 +++--
.../core/ingest/dumper/InventoryDumper.java | 7 ++-
...YamlJobItemIncrementalTasksProgressSwapper.java | 4 +-
.../metadata/generator/PipelineDDLGenerator.java | 4 +-
.../core/prepare/InventoryTaskSplitter.java | 6 +-
.../core/prepare/PipelineJobPreparerUtils.java | 23 +++++---
.../datasource/AbstractDataSourcePreparer.java | 4 +-
.../data/pipeline/core/task/IncrementalTask.java | 3 +-
.../data/pipeline/core/util/DatabaseTypeUtil.java | 52 ++++++++++++++++++
.../pipeline/core/util/JDBCStreamQueryUtil.java | 23 ++++++--
...ne.spi.datasource.JdbcQueryPropertiesExtension} | 2 +-
...ipeline.spi.ingest.position.PositionInitializer | 18 ------
...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder | 18 ------
.../pipeline/core/util/DatabaseTypeUtilTest.java} | 36 +++++-------
.../migration/api/impl/MigrationJobAPI.java | 11 +++-
.../migration/prepare/MigrationJobPreparer.java | 4 +-
.../src/test/resources/env/mysql/01-initdb.sql | 5 ++
.../test/resources/env/postgresql/01-initdb.sql | 7 +++
.../core/util/spi/PipelineTypedSPILoaderTest.java | 62 +++++++++++++++++++++
28 files changed, 306 insertions(+), 124 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 5ffc2c94e0d..23f115a4629 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -25,10 +25,10 @@ import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -77,7 +77,7 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
}
private void appendJdbcQueryProperties(final String databaseType) {
- Optional<JdbcQueryPropertiesExtension> extension = TypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
+ Optional<JdbcQueryPropertiesExtension> extension = PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class, databaseType);
if (!extension.isPresent()) {
return;
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 4fbaeabca1e..712dd2a2ea2 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -22,11 +22,11 @@ import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -98,7 +98,7 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
}
private void appendJdbcQueryProperties(final String databaseType, final Map<String, Object> yamlConfig) {
- Optional<JdbcQueryPropertiesExtension> extension = TypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
+ Optional<JdbcQueryPropertiesExtension> extension = PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class, databaseType);
if (!extension.isPresent()) {
return;
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
index 6697b100158..0704e3595ef 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
@@ -31,7 +31,7 @@ public interface JdbcQueryPropertiesExtension extends TypedSPI {
/**
* Extend query properties.
*
- * @return JDBC query properties for extension
+ * @return JDBC query properties for extension. Could NOT be null
*/
Properties extendQueryProperties();
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
new file mode 100644
index 00000000000..3235905cbae
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.util.spi;
+
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundServerException;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+
+import java.util.Optional;
+
+/**
+ * Pipeline typed SPI loader.
+ */
+public final class PipelineTypedSPILoader {
+
+ /**
+ * Find database typed service.
+ *
+ * @param spiClass typed SPI class
+ * @param databaseType database type
+ * @param <T> SPI class type
+ * @return service
+ */
+ public static <T extends TypedSPI> Optional<T> findDatabaseTypedService(final Class<T> spiClass, final String databaseType) {
+ Optional<T> result = TypedSPILoader.findService(spiClass, databaseType);
+ if (result.isPresent()) {
+ return result;
+ }
+ Optional<DatabaseType> type = TypedSPILoader.findService(DatabaseType.class, databaseType);
+ if (type.isPresent() && type.get() instanceof BranchDatabaseType) {
+ return TypedSPILoader.findService(spiClass, ((BranchDatabaseType) type.get()).getTrunkDatabaseType().getType());
+ }
+ return result;
+ }
+
+ /**
+ * Get database typed service.
+ *
+ * @param spiClass typed SPI class
+ * @param databaseType database type
+ * @param <T> SPI class type
+ * @return service
+ */
+ public static <T extends TypedSPI> T getDatabaseTypedService(final Class<T> spiClass, final String databaseType) {
+ return findDatabaseTypedService(spiClass, databaseType).orElseThrow(() -> new ServiceProviderNotFoundServerException(spiClass));
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index cc872eda2d0..4a9f08a9e9b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -24,11 +24,12 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
+import org.apache.shardingsphere.data.pipeline.core.util.DatabaseTypeUtil;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -47,11 +48,11 @@ import java.util.stream.Collectors;
@Slf4j
public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
- private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
+ private static final Collection<String> SUPPORTED_DATABASE_TYPES = DatabaseTypeUtil.getTrunkAndBranchDatabaseTypes(Collections.singleton(new MySQLDatabaseType().getType()));
@Override
public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter param) {
- PipelineSQLBuilder sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, param.getDatabaseType());
+ PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
List<CalculatedItem> calculatedItems = param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, param, each)).collect(Collectors.toList());
return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index c7206ac7a30..482beb4b306 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -30,10 +30,12 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.BasicColumnValueReader;
import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -91,7 +93,8 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
try {
Collection<Collection<Object>> records = new LinkedList<>();
Object maxUniqueKeyValue = null;
- ColumnValueReader columnValueReader = TypedSPILoader.getService(ColumnValueReader.class, param.getDatabaseType());
+ ColumnValueReader columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, param.getDatabaseType())
+ .orElseGet(() -> new BasicColumnValueReader(param.getDatabaseType()));
ResultSet resultSet = calculationContext.getResultSet();
while (resultSet.next()) {
if (isCanceling()) {
@@ -171,7 +174,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
private String getQuerySQL(final DataConsistencyCalculateParameter param) {
- PipelineSQLBuilder sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, param.getDatabaseType());
+ PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
String logicTableName = param.getLogicTableName();
String schemaName = param.getSchemaName();
String uniqueKey = param.getUniqueKey().getName();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index 418cfc74c82..636bfba2bab 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -65,7 +65,7 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
}
private boolean checkEmpty(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
- String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
+ String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/H2JdbcQueryPropertiesExtension.java
similarity index 59%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/H2JdbcQueryPropertiesExtension.java
index 78730503675..ba14a9ded9f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/H2JdbcQueryPropertiesExtension.java
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.datasource;
+
+import org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+
+import java.util.Properties;
/**
- * Default pipeline SQL builder.
+ * H2 JDBC query properties extension.
+ *
+ * <p>H2 is branch database of MySQL, but JDBC URL isn't compatible with MySQL.</p>
*/
-public final class DefaultPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
-
- @Override
- public boolean isDefault() {
- return true;
- }
+public final class H2JdbcQueryPropertiesExtension implements JdbcQueryPropertiesExtension {
- @Override
- protected boolean isKeyword(final String item) {
- return false;
- }
+ private final Properties queryProps = new Properties();
@Override
- protected String getLeftIdentifierQuoteString() {
- return "";
+ public Properties extendQueryProperties() {
+ return queryProps;
}
@Override
- protected String getRightIdentifierQuoteString() {
- return "";
+ public String getType() {
+ return "H2";
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 6874778e804..db2da8610d2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -41,8 +41,8 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -86,7 +86,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
this.dataSourceManager = (PipelineDataSourceManager) importerConnector.getConnector();
this.channel = channel;
- pipelineSqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, importerConfig.getDataSourceConfig().getDatabaseType().getType());
+ pipelineSqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, importerConfig.getDataSourceConfig().getDatabaseType().getType());
this.jobProgressListener = jobProgressListener;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultColumnValueReader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
similarity index 81%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultColumnValueReader.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
index 37c03115492..17f312fdffd 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultColumnValueReader.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
@@ -17,14 +17,19 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+import lombok.RequiredArgsConstructor;
+
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
/**
- * Default column value reader.
+ * Basic column value reader.
*/
-public final class DefaultColumnValueReader extends AbstractColumnValueReader {
+@RequiredArgsConstructor
+public final class BasicColumnValueReader extends AbstractColumnValueReader {
+
+ private final String databaseType;
@Override
protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
@@ -32,7 +37,7 @@ public final class DefaultColumnValueReader extends AbstractColumnValueReader {
}
@Override
- public boolean isDefault() {
- return true;
+ public String getType() {
+ return databaseType;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 395dc244917..277ab2084c2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -44,11 +44,11 @@ import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -85,8 +85,9 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
this.dumperConfig = dumperConfig;
this.channel = channel;
this.dataSource = dataSource;
- sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType());
- columnValueReader = TypedSPILoader.getService(ColumnValueReader.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType());
+ String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
+ sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, databaseType);
+ columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, databaseType).orElseGet(() -> new BasicColumnValueReader(databaseType));
this.metaDataLoader = metaDataLoader;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index 26536ec0e51..e2c5b84ebe1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
/**
* YAML job item incremental tasks progress swapper.
@@ -60,7 +60,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper {
}
IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
// TODO consider to remove parameter databaseType
- PositionInitializer positionInitializer = TypedSPILoader.getService(PositionInitializer.class, databaseType);
+ PositionInitializer positionInitializer = PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType);
taskProgress.setPosition(positionInitializer.init(yamlProgress.getPosition()));
taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
return new JobItemIncrementalTasksProgress(taskProgress);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index c7a103d356c..36f1ab11531 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.binder.QueryContext;
import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -32,7 +33,6 @@ import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtil;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.sql.parser.sql.common.segment.SQLSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.constraint.ConstraintSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
@@ -75,7 +75,7 @@ public final class PipelineDDLGenerator {
final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
- for (String each : TypedSPILoader.getService(CreateTableSQLGenerator.class, databaseType.getType()).generate(sourceDataSource, schemaName, sourceTableName)) {
+ for (String each : PipelineTypedSPILoader.getDatabaseTypedService(CreateTableSQLGenerator.class, databaseType.getType()).generate(sourceDataSource, schemaName, sourceTableName)) {
Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(optional -> result.append(optional).append(DELIMITER).append(System.lineSeparator()));
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index b1c8433e99e..090e131a527 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -43,7 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -172,7 +172,7 @@ public final class InventoryTaskSplitter {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
// TODO with a large amount of data, count the full table will have performance problem
- String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
+ String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
@@ -191,7 +191,7 @@ public final class InventoryTaskSplitter {
Collection<IngestPosition<?>> result = new LinkedList<>();
PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
+ String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
try (
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 4fc429ffe49..a0dabdec187 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -35,13 +35,14 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTa
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
@@ -63,7 +64,7 @@ public final class PipelineJobPreparerUtils {
* @return true if supported, otherwise false
*/
public static boolean isIncrementalSupported(final String databaseType) {
- return TypedSPILoader.contains(IncrementalDumperCreator.class, databaseType);
+ return PipelineTypedSPILoader.findDatabaseTypedService(IncrementalDumperCreator.class, databaseType).isPresent();
}
/**
@@ -74,7 +75,7 @@ public final class PipelineJobPreparerUtils {
* @throws SQLException if prepare target schema fail
*/
public static void prepareTargetSchema(final String databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws SQLException {
- Optional<DataSourcePreparer> dataSourcePreparer = TypedSPILoader.findService(DataSourcePreparer.class, databaseType);
+ Optional<DataSourcePreparer> dataSourcePreparer = PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class, databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
return;
@@ -91,7 +92,11 @@ public final class PipelineJobPreparerUtils {
public static ShardingSphereSQLParserEngine getSQLParserEngine(final String targetDatabaseName) {
ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
ShardingSphereDatabase database = metaData.getDatabase(targetDatabaseName);
- return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType().getType());
+ DatabaseType databaseType = database.getProtocolType();
+ if (databaseType instanceof BranchDatabaseType) {
+ databaseType = ((BranchDatabaseType) databaseType).getTrunkDatabaseType();
+ }
+ return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType.getType());
}
/**
@@ -102,7 +107,7 @@ public final class PipelineJobPreparerUtils {
* @throws SQLException SQL exception
*/
public static void prepareTargetTables(final String databaseType, final PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException {
- Optional<DataSourcePreparer> dataSourcePreparer = TypedSPILoader.findService(DataSourcePreparer.class, databaseType);
+ Optional<DataSourcePreparer> dataSourcePreparer = PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class, databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
return;
@@ -131,7 +136,7 @@ public final class PipelineJobPreparerUtils {
}
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- return TypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperConfig.getJobId());
+ return PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType).init(dataSource, dumperConfig.getJobId());
}
/**
@@ -144,7 +149,7 @@ public final class PipelineJobPreparerUtils {
if (dataSources.isEmpty()) {
return;
}
- DataSourceChecker dataSourceChecker = TypedSPILoader.findService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+ DataSourceChecker dataSourceChecker = PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
dataSourceChecker.checkConnection(dataSources);
dataSourceChecker.checkPrivilege(dataSources);
dataSourceChecker.checkVariable(dataSources);
@@ -158,7 +163,7 @@ public final class PipelineJobPreparerUtils {
* @param targetDataSources target data sources
*/
public static void checkTargetDataSource(final String databaseType, final ImporterConfiguration importerConfig, final Collection<? extends DataSource> targetDataSources) {
- DataSourceChecker dataSourceChecker = TypedSPILoader.findService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+ DataSourceChecker dataSourceChecker = PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
if (null == targetDataSources || targetDataSources.isEmpty()) {
log.info("target data source is empty, skip check");
return;
@@ -176,7 +181,7 @@ public final class PipelineJobPreparerUtils {
*/
public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
- PositionInitializer positionInitializer = TypedSPILoader.getService(PositionInitializer.class, databaseType.getType());
+ PositionInitializer positionInitializer = PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType.getType());
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 5e87e2630a1..f29b894c9e7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -25,10 +25,10 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -57,7 +57,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
}
CreateTableConfiguration createTableConfig = param.getCreateTableConfig();
String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
- PipelineSQLBuilder sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, targetDatabaseType.getType());
+ PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, targetDatabaseType.getType());
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) {
String targetSchemaName = each.getTargetName().getSchemaName().getOriginal();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 97474b8fb6a..b279c1704fe 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -40,6 +40,7 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.util.Collection;
@@ -78,7 +79,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
IngestPosition<?> position = dumperConfig.getPosition();
taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
- dumper = TypedSPILoader.getService(
+ dumper = PipelineTypedSPILoader.getDatabaseTypedService(
IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
importers = createImporters(concurrency, importerConfig, importerConnector, channel, jobItemContext);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtil.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtil.java
new file mode 100644
index 00000000000..e04578920e7
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Set;
+
+/**
+ * Database type util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DatabaseTypeUtil {
+
+ /**
+ * Get trunk and branch database types.
+ *
+ * @param trunkDatabaseTypes trunk database types
+ * @return database types
+ */
+ public static Collection<String> getTrunkAndBranchDatabaseTypes(final Set<String> trunkDatabaseTypes) {
+ Collection<String> result = new LinkedList<>();
+ for (DatabaseType each : ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class)) {
+ if (trunkDatabaseTypes.contains(each.getType())
+ || (each instanceof BranchDatabaseType && trunkDatabaseTypes.contains(((BranchDatabaseType) each).getTrunkDatabaseType().getType()))) {
+ result.add(each.getType());
+ }
+ }
+ return result;
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java
index 36dc07c879d..56299033424 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java
@@ -18,7 +18,9 @@
package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
@@ -45,24 +47,35 @@ public final class JDBCStreamQueryUtil {
*/
public static PreparedStatement generateStreamQueryPreparedStatement(final DatabaseType databaseType, final Connection connection, final String sql) throws SQLException {
if (databaseType instanceof MySQLDatabaseType) {
- return generateMySQLStreamQueryPreparedStatement(connection, sql);
+ return generateForMySQL(connection, sql);
}
if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
- return generatePostgreSQLStreamQueryPreparedStatement(connection, sql);
+ return generateForPostgreSQL(connection, sql);
+ }
+ if (databaseType instanceof H2DatabaseType) {
+ return generateByDefault(connection, sql);
+ }
+ if (databaseType instanceof BranchDatabaseType) {
+ return generateStreamQueryPreparedStatement(((BranchDatabaseType) databaseType).getTrunkDatabaseType(), connection, sql);
}
log.warn("not support {} streaming query now, pay attention to memory usage", databaseType.getType());
- return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return generateByDefault(connection, sql);
}
- private static PreparedStatement generateMySQLStreamQueryPreparedStatement(final Connection connection, final String sql) throws SQLException {
+ // TODO Consider use SPI
+ private static PreparedStatement generateForMySQL(final Connection connection, final String sql) throws SQLException {
PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
result.setFetchSize(Integer.MIN_VALUE);
return result;
}
- private static PreparedStatement generatePostgreSQLStreamQueryPreparedStatement(final Connection connection, final String sql) throws SQLException {
+ private static PreparedStatement generateForPostgreSQL(final Connection connection, final String sql) throws SQLException {
PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.CLOSE_CURSORS_AT_COMMIT);
connection.setAutoCommit(false);
return result;
}
+
+ private static PreparedStatement generateByDefault(final Connection connection, final String sql) throws SQLException {
+ return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
similarity index 90%
rename from kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
index 3ec8c078164..b185951d3aa 100644
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DefaultColumnValueReader
+org.apache.shardingsphere.data.pipeline.core.datasource.H2JdbcQueryPropertiesExtension
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
deleted file mode 100644
index 7cc67f99c2e..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.ingest.position.DefaultPositionInitializer
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
deleted file mode 100644
index 3ca94cb5097..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.sqlbuilder.DefaultPipelineSQLBuilder
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtilTest.java
similarity index 52%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
rename to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtilTest.java
index 393ac30e83a..977d1716c42 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtilTest.java
@@ -15,30 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.position;
+package org.apache.shardingsphere.data.pipeline.core.util;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.junit.Test;
-import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
-/**
- * Default position initializer.
- */
-public final class DefaultPositionInitializer implements PositionInitializer {
-
- @Override
- public IngestPosition<?> init(final DataSource dataSource, final String slotNameSuffix) {
- return null;
- }
-
- @Override
- public IngestPosition<?> init(final String data) {
- return null;
- }
+import static org.junit.Assert.assertTrue;
+
+public final class DatabaseTypeUtilTest {
- @Override
- public boolean isDefault() {
- return true;
+ @Test
+ public void assertGetBranchDatabaseTypes() {
+ Set<String> trunkDatabaseTypes = Collections.singleton(new MySQLDatabaseType().getType());
+ Collection<String> actual = DatabaseTypeUtil.getTrunkAndBranchDatabaseTypes(trunkDatabaseTypes);
+ assertTrue("MySQL not present", actual.contains("MySQL"));
+ assertTrue("MariaDB not present", actual.contains("MariaDB"));
}
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 7f914dee3c8..4361e59e62e 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -70,6 +70,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -88,6 +89,8 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.exception.metadata.ShardingRuleNotFoundException;
import javax.sql.DataSource;
import java.nio.charset.StandardCharsets;
@@ -199,7 +202,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig) {
String sourceSchemaName = jobConfig.getSourceSchemaName();
- String targetSchemaName = TypedSPILoader.getService(DatabaseType.class, jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : null;
+ String targetSchemaName = PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : null;
CreateTableEntry createTableEntry = new CreateTableEntry(
jobConfig.getSource(), new SchemaTableName(new SchemaName(sourceSchemaName), new TableName(jobConfig.getSourceTableName())),
jobConfig.getTarget(), new SchemaTableName(new SchemaName(targetSchemaName), new TableName(jobConfig.getTargetTableName())));
@@ -307,7 +310,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
String targetTableName = jobConfig.getTargetTableName();
// TODO use jobConfig.targetSchemaName
String targetSchemaName = jobConfig.getSourceSchemaName();
- PipelineSQLBuilder pipelineSQLBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getTargetDatabaseType());
+ PipelineSQLBuilder pipelineSQLBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getTargetDatabaseType());
try (
PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
@@ -447,6 +450,10 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
}
private YamlRootConfiguration getYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
+ // TODO Check rules empty. Sharding rule could be empty
+ if (rules.stream().noneMatch(each -> each instanceof ShardingRuleConfiguration)) {
+ throw new ShardingRuleNotFoundException();
+ }
YamlRootConfiguration result = new YamlRootConfiguration();
result.setDatabaseName(databaseName);
result.setDataSources(yamlDataSources);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 1cb2c4114e3..0460ac79997 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -40,11 +40,11 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.Migra
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
@@ -140,7 +140,7 @@ public final class MigrationJobPreparer {
CreateTableConfiguration createTableConfig = jobItemContext.getTaskConfig().getCreateTableConfig();
PipelineDataSourceManager dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
PrepareTargetSchemasParameter prepareTargetSchemasParam = new PrepareTargetSchemasParameter(
- TypedSPILoader.getService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
+ PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParam);
ShardingSphereSQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(jobConfig.getTargetDatabaseName());
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
diff --git a/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql b/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql
index 540d9d65ff7..4cb199d48f9 100644
--- a/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql
+++ b/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql
@@ -17,6 +17,11 @@
REVOKE ALL PRIVILEGES ON *.* FROM 'test_user'@'%';
+DROP DATABASE IF EXISTS pipeline_it_0;
+DROP DATABASE IF EXISTS pipeline_it_1;
+DROP DATABASE IF EXISTS pipeline_it_2;
+DROP DATABASE IF EXISTS pipeline_it_3;
+DROP DATABASE IF EXISTS pipeline_it_4;
CREATE DATABASE pipeline_it_0;
CREATE DATABASE pipeline_it_1;
CREATE DATABASE pipeline_it_2;
diff --git a/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql b/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql
index 0ca196ab765..75035c5463e 100644
--- a/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql
+++ b/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql
@@ -16,11 +16,18 @@
ALTER USER test_user NOSUPERUSER;
ALTER USER test_user REPLICATION;
+
+DROP DATABASE IF EXISTS pipeline_it_0;
+DROP DATABASE IF EXISTS pipeline_it_1;
+DROP DATABASE IF EXISTS pipeline_it_2;
+DROP DATABASE IF EXISTS pipeline_it_3;
+DROP DATABASE IF EXISTS pipeline_it_4;
CREATE DATABASE pipeline_it_0;
CREATE DATABASE pipeline_it_1;
CREATE DATABASE pipeline_it_2;
CREATE DATABASE pipeline_it_3;
CREATE DATABASE pipeline_it_4;
+
-- TODO remove unnecessary permissions
GRANT CREATE, CONNECT ON DATABASE pipeline_it_0 TO test_user;
GRANT CREATE, CONNECT ON DATABASE pipeline_it_1 TO test_user;
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
new file mode 100644
index 00000000000..0d94a5a70a6
--- /dev/null
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.util.spi;
+
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLColumnValueReader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public final class PipelineTypedSPILoaderTest {
+
+ @Test
+ public void assertFindDatabaseTypedService() {
+ Optional<PipelineSQLBuilder> actual = PipelineTypedSPILoader.findDatabaseTypedService(PipelineSQLBuilder.class, "MariaDB");
+ assertTrue(actual.isPresent());
+ assertThat(actual.get().getType(), is("MySQL"));
+ }
+
+ @Test
+ public void assertGetPipelineSQLBuilder() {
+ PipelineSQLBuilder actual = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, "MariaDB");
+ assertNotNull(actual);
+ assertThat(actual.getType(), is("MySQL"));
+ }
+
+ @Test
+ public void assertFindColumnValueReaderByUnknown() {
+ Optional<ColumnValueReader> actual = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, "Unknown");
+ assertFalse(actual.isPresent());
+ }
+
+ @Test
+ public void assertGetColumnValueReaderByBranchDB() {
+ ColumnValueReader actual = PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, "MariaDB");
+ assertNotNull(actual);
+ assertThat(actual.getClass().getName(), is(MySQLColumnValueReader.class.getName()));
+ }
+}