You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/09/06 06:33:42 UTC
[shardingsphere] branch master updated: Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer (#20824)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 50ae15890a0 Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer (#20824)
50ae15890a0 is described below
commit 50ae15890a00d89f3799a63279f6ceeb95d032b0
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Sep 6 14:33:33 2022 +0800
Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer (#20824)
* Add PipelineTaskConfiguration
* Rename TaskConfiguration to MigrationTaskConfiguration
* Decouple InventoryTaskSplitter with MigrationTaskConfiguration
* Add CreateTableConfiguration; Refactor DataSourcePreparer method parameters and job preparer
* Unit test
---
.../infra/database/type/DatabaseTypeEngine.java | 10 +++
.../api/config/CreateTableConfiguration.java} | 38 +++++-----
...uration.java => PipelineTaskConfiguration.java} | 16 +---
.../{TableName.java => IdentifierName.java} | 17 ++---
.../SchemaName.java} | 22 +++---
.../SchemaTableName.java} | 16 ++--
.../data/pipeline/api/metadata/TableName.java | 40 +---------
.../data/pipeline/core/api/PipelineJobAPI.java | 4 +-
.../DefaultPipelineDataSourceManager.java | 1 +
.../metadata/generator/PipelineDDLGenerator.java | 44 +++++------
.../core/prepare/InventoryTaskSplitter.java | 20 ++---
.../core/prepare/PipelineJobPreparerUtils.java | 20 ++++-
.../datasource/AbstractDataSourcePreparer.java | 86 +++++++++-------------
.../prepare/datasource/DataSourcePreparer.java | 3 +-
.../datasource/PrepareTargetSchemasParameter.java | 13 +---
.../datasource/PrepareTargetTablesParameter.java | 33 +--------
.../pipeline/scenario/migration/MigrationJob.java | 3 +-
.../scenario/migration/MigrationJobAPI.java | 4 +
.../scenario/migration/MigrationJobAPIImpl.java | 42 +++++++----
.../migration/MigrationJobItemContext.java | 5 +-
.../scenario/migration/MigrationJobPreparer.java | 45 +++--------
.../migration/MigrationTaskConfiguration.java} | 11 ++-
.../core/fixture/MigrationJobAPIFixture.java | 4 +-
.../datasource/MySQLDataSourcePreparer.java | 13 ++--
.../datasource/MySQLDataSourcePreparerTest.java | 1 -
.../datasource/OpenGaussDataSourcePreparer.java | 17 ++---
.../datasource/PostgreSQLDataSourcePreparer.java | 17 ++---
.../api/impl/GovernanceRepositoryAPIImplTest.java | 8 +-
.../core/prepare/InventoryTaskSplitterTest.java | 9 ++-
.../pipeline/core/task/IncrementalTaskTest.java | 4 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 4 +-
.../pipeline/core/util/PipelineContextUtil.java | 4 +-
32 files changed, 252 insertions(+), 322 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index 08872c7124a..8cf56eb08d0 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -154,4 +154,14 @@ public final class DatabaseTypeEngine {
public static String getDefaultSchemaName(final DatabaseType databaseType, final String databaseName) {
return databaseType instanceof SchemaSupportedDatabaseType ? ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema() : databaseName.toLowerCase();
}
+
+ /**
+ * Get default schema name.
+ *
+ * @param databaseType database type
+ * @return default schema name
+ */
+ public static Optional<String> getDefaultSchemaName(final DatabaseType databaseType) {
+ return databaseType instanceof SchemaSupportedDatabaseType ? Optional.of(((SchemaSupportedDatabaseType) databaseType).getDefaultSchema()) : Optional.empty();
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
similarity index 55%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
index df94812d018..fd69f7be447 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
@@ -15,33 +15,37 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.api.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
-import java.util.List;
+import java.util.Collection;
/**
- * Prepare target schemas parameter.
+ * Create table configuration.
*/
@RequiredArgsConstructor
@Getter
-public final class PrepareTargetSchemasParameter {
+@ToString
+public final class CreateTableConfiguration {
- private final List<String> logicTableNames;
+ private final Collection<CreateTableEntry> createTableEntries;
- private final DatabaseType targetDatabaseType;
-
- private final String databaseName;
-
- private final PipelineDataSourceConfiguration dataSourceConfig;
-
- private final PipelineDataSourceManager dataSourceManager;
-
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ @RequiredArgsConstructor
+ @Getter
+ @ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"})
+ public static final class CreateTableEntry {
+
+ private final PipelineDataSourceConfiguration sourceDataSourceConfig;
+
+ private final SchemaTableName sourceName;
+
+ private final PipelineDataSourceConfiguration targetDataSourceConfig;
+
+ private final SchemaTableName targetName;
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
similarity index 68%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
index 67c7187ce24..29af6e31597 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
@@ -17,20 +17,8 @@
package org.apache.shardingsphere.data.pipeline.api.config;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-
/**
- * Task configuration.
+ * Pipeline task configuration.
*/
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
-
- private final DumperConfiguration dumperConfig;
-
- private final ImporterConfiguration importerConfig;
+public interface PipelineTaskConfiguration {
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
similarity index 81%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
index e84c4847807..f67c76ab003 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
@@ -18,27 +18,24 @@
package org.apache.shardingsphere.data.pipeline.api.metadata;
import lombok.Getter;
-import lombok.NonNull;
import java.util.Objects;
/**
- * Table name.
- * <p>It might be logic table name or actual table name.</p>
+ * Identifier name.
+ * <p>It might be schema name or table name, etc.</p>
* <p>It's case-insensitive.</p>
*/
@Getter
-public class TableName {
+public class IdentifierName {
- @NonNull
private final String original;
- @NonNull
private final String lowercase;
- public TableName(final String tableName) {
- this.original = tableName;
- this.lowercase = tableName.toLowerCase();
+ public IdentifierName(final String identifierName) {
+ this.original = identifierName;
+ this.lowercase = null != identifierName ? identifierName.toLowerCase() : null;
}
// TODO table name case-sensitive for some database
@@ -50,7 +47,7 @@ public class TableName {
if (o == null || getClass() != o.getClass()) {
return false;
}
- final TableName tableName = (TableName) o;
+ final IdentifierName tableName = (IdentifierName) o;
return lowercase.equals(tableName.lowercase);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
similarity index 64%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
index 67c7187ce24..a9195f2020a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
@@ -15,22 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import javax.annotation.Nullable;
/**
- * Task configuration.
+ * Schema name.
+ * <p>It might be null.</p>
+ * <p>It's case-insensitive.</p>
*/
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
+public class SchemaName extends IdentifierName {
- private final DumperConfiguration dumperConfig;
-
- private final ImporterConfiguration importerConfig;
+ public SchemaName(@Nullable final String schemaName) {
+ super(schemaName);
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
similarity index 75%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index 67c7187ce24..f3f7dc7fbd4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
import lombok.Getter;
+import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
/**
- * Task configuration.
+ * Schema name and table name.
*/
-@Getter
@RequiredArgsConstructor
+@Getter
@ToString
-public final class TaskConfiguration {
+public class SchemaTableName {
- private final DumperConfiguration dumperConfig;
+ @NonNull
+ private final SchemaName schemaName;
- private final ImporterConfiguration importerConfig;
+ @NonNull
+ private final TableName tableName;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
index e84c4847807..b48332797dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
@@ -17,50 +17,16 @@
package org.apache.shardingsphere.data.pipeline.api.metadata;
-import lombok.Getter;
import lombok.NonNull;
-import java.util.Objects;
-
/**
* Table name.
* <p>It might be logic table name or actual table name.</p>
* <p>It's case-insensitive.</p>
*/
-@Getter
-public class TableName {
-
- @NonNull
- private final String original;
-
- @NonNull
- private final String lowercase;
-
- public TableName(final String tableName) {
- this.original = tableName;
- this.lowercase = tableName.toLowerCase();
- }
-
- // TODO table name case-sensitive for some database
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final TableName tableName = (TableName) o;
- return lowercase.equals(tableName.lowercase);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(lowercase);
- }
+public class TableName extends IdentifierName {
- @Override
- public String toString() {
- return original;
+ public TableName(@NonNull final String tableName) {
+ super(tableName);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index e083d70efcc..06b4c336435 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
@@ -58,7 +58,7 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
- TaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
+ PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
/**
* Build pipeline process context.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
index 66ad7498d3c..ce2a9cbbf33 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
@@ -56,6 +56,7 @@ public final class DefaultPipelineDataSourceManager implements PipelineDataSourc
}
}
+ // TODO monitor each DataSource close
/**
* Close, close cached data source.
*/
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index ecff9462e74..1df7d29a532 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -64,24 +64,24 @@ public final class PipelineDDLGenerator {
* @param databaseType database type
* @param sourceDataSource source data source
* @param schemaName schema name
- * @param logicTableName table name
- * @param actualTableName actual table name
+ * @param sourceTableName source table name
+ * @param targetTableName target table name
* @param parserEngine parser engine
- * @return DDL
+ * @return DDL SQL
* @throws SQLException SQL exception
*/
public String generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
- final String schemaName, final String logicTableName, final String actualTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
- log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, tableName={}", databaseType.getType(), schemaName, logicTableName);
+ final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
+ log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
StringBuilder result = new StringBuilder();
- for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, actualTableName)) {
- Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, logicTableName, parserEngine, each);
+ for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
+ Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
return result.toString();
}
- private Optional<String> decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String logicTableName,
+ private Optional<String> decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String targetTableName,
final ShardingSphereSQLParserEngine parserEngine, final String sql) throws SQLException {
if (sql.trim().isEmpty()) {
return Optional.empty();
@@ -90,7 +90,7 @@ public final class PipelineDDLGenerator {
try (Connection connection = dataSource.getConnection()) {
databaseName = connection.getCatalog();
}
- String result = decorateActualSQL(databaseName, logicTableName, parserEngine, sql.trim());
+ String result = decorateActualSQL(databaseName, targetTableName, parserEngine, sql.trim());
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result, parserEngine);
@@ -98,24 +98,24 @@ public final class PipelineDDLGenerator {
return Optional.of(result);
}
- private String decorateActualSQL(final String databaseName, final String logicTableName, final ShardingSphereSQLParserEngine parserEngine, final String sql) {
+ private String decorateActualSQL(final String databaseName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine, final String sql) {
QueryContext queryContext = getQueryContext(databaseName, parserEngine, sql);
SQLStatementContext<?> sqlStatementContext = queryContext.getSqlStatementContext();
Map<SQLSegment, String> replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
if (sqlStatementContext instanceof CreateTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
- appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
}
if (sqlStatementContext instanceof CommentStatementContext) {
- appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
}
if (sqlStatementContext instanceof CreateIndexStatementContext) {
- appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
- appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext);
}
if (sqlStatementContext instanceof AlterTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
- appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
}
return doDecorateActualTable(replaceMap, sql);
}
@@ -125,12 +125,12 @@ public final class PipelineDDLGenerator {
return new QueryContext(sqlStatementContext, sql, Collections.emptyList());
}
- private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final String logicTableName, final SQLStatementContext<?> sqlStatementContext) {
+ private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final String targetTableName, final SQLStatementContext<?> sqlStatementContext) {
if (!(sqlStatementContext instanceof TableAvailable) || ((TableAvailable) sqlStatementContext).getTablesContext().getTables().isEmpty()) {
return;
}
TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getTables().iterator().next().getTableName();
- if (!tableNameSegment.getIdentifier().getValue().equals(logicTableName)) {
+ if (!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) {
if (sqlStatementContext instanceof IndexAvailable) {
for (IndexSegment each : ((IndexAvailable) sqlStatementContext).getIndexes()) {
String logicIndexName = IndexMetaDataUtil.getLogicIndexName(each.getIndexName().getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue());
@@ -146,10 +146,10 @@ public final class PipelineDDLGenerator {
}
}
- private void appendFromTable(final Map<SQLSegment, String> replaceMap, final String logicTableName, final TableAvailable sqlStatementContext) {
+ private void appendFromTable(final Map<SQLSegment, String> replaceMap, final String targetTableName, final TableAvailable sqlStatementContext) {
for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
- if (!logicTableName.equals(each.getTableName().getIdentifier().getValue())) {
- replaceMap.put(each.getTableName(), logicTableName);
+ if (!targetTableName.equals(each.getTableName().getIdentifier().getValue())) {
+ replaceMap.put(each.getTableName(), targetTableName);
}
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index f2e3a02064b..e10415451c0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
@@ -66,17 +66,19 @@ import java.util.List;
@RequiredArgsConstructor
public final class InventoryTaskSplitter {
- private final PipelineTableMetaDataLoader metaDataLoader;
+ private final PipelineDataSourceWrapper sourceDataSource;
- private final PipelineDataSourceManager dataSourceManager;
+ private final DumperConfiguration dumperConfig;
- private final ExecuteEngine importerExecuteEngine;
+ private final ImporterConfiguration importerConfig;
- private final PipelineDataSourceWrapper sourceDataSource;
+ private final InventoryIncrementalJobItemProgress initProgress;
- private final TaskConfiguration taskConfig;
+ private final PipelineTableMetaDataLoader metaDataLoader;
- private final InventoryIncrementalJobItemProgress initProgress;
+ private final PipelineDataSourceManager dataSourceManager;
+
+ private final ExecuteEngine importerExecuteEngine;
/**
* Split inventory data to multi-tasks.
@@ -88,8 +90,8 @@ public final class InventoryTaskSplitter {
List<InventoryTask> result = new LinkedList<>();
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobItemContext.getJobId(), jobItemContext.getShardingItem());
- for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, taskConfig.getDumperConfig())) {
- result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader, importerExecuteEngine,
+ for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
+ result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader, importerExecuteEngine,
jobProgressListener));
}
return result;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 096b8851510..0a080946ad9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.check.datasource.DataSourceCheckerFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
@@ -38,7 +39,11 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionIniti
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
import javax.sql.DataSource;
import java.sql.SQLException;
@@ -72,8 +77,9 @@ public final class PipelineJobPreparerUtils {
*
* @param databaseType database type
* @param prepareTargetSchemasParameter prepare target schemas parameter
+ * @throws SQLException if prepare target schema fail
*/
- public static void prepareTargetSchema(final String databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParameter) {
+ public static void prepareTargetSchema(final String databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParameter) throws SQLException {
Optional<DataSourcePreparer> dataSourcePreparer = DataSourcePreparerFactory.getInstance(databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
@@ -82,6 +88,18 @@ public final class PipelineJobPreparerUtils {
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
}
+ /**
+ * Get SQL parser engine.
+ *
+ * @param targetDatabaseName target database name
+ * @return SQL parser engine
+ */
+ public static ShardingSphereSQLParserEngine getSQLParserEngine(final String targetDatabaseName) {
+ ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
+ ShardingSphereDatabase database = metaData.getDatabases().get(targetDatabaseName);
+ return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType().getType());
+ }
+
/**
* Prepare target tables.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index c0172f9c51f..9dcf18e2a8d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,29 +17,24 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
@@ -56,43 +51,38 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
@Override
- public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) {
- Set<String> schemaNames = getSchemaNames(parameter);
- String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), parameter.getDatabaseName());
- log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", schemaNames, defaultSchema);
- PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
- try (Connection targetConnection = getCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
- for (String each : schemaNames) {
- if (each.equalsIgnoreCase(defaultSchema)) {
- continue;
- }
- String sql = pipelineSQLBuilder.buildCreateSchemaSQL(each);
+ public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) throws SQLException {
+ DatabaseType targetDatabaseType = parameter.getTargetDatabaseType();
+ if (!targetDatabaseType.isSchemaAvailable()) {
+ log.info("prepareTargetSchemas, target database does not support schema, ignore, targetDatabaseType={}", targetDatabaseType);
+ return;
+ }
+ CreateTableConfiguration createTableConfig = parameter.getCreateTableConfig();
+ String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
+ PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
+ Set<String> createdSchemaNames = new HashSet<>();
+ for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) {
+ String targetSchemaName = each.getTargetName().getSchemaName().getOriginal();
+ if (null == targetSchemaName) {
+ continue;
+ }
+ if (targetSchemaName.equalsIgnoreCase(defaultSchema)) {
+ continue;
+ }
+ if (createdSchemaNames.contains(targetSchemaName)) {
+ continue;
+ }
+ try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
+ String sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
log.info("prepareTargetSchemas, sql={}", sql);
try (Statement statement = targetConnection.createStatement()) {
statement.execute(sql);
+ createdSchemaNames.add(targetSchemaName);
} catch (final SQLException ignored) {
}
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("Can not get connection.", ex);
}
- }
-
- private Set<String> getSchemaNames(final PrepareTargetSchemasParameter parameter) {
- Set<String> result = new HashSet<>();
- for (String each : parameter.getLogicTableNames()) {
- String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each);
- if (null == schemaName) {
- throw new PipelineJobPrepareFailedException("Can not get schemaName by logic table name " + each);
- }
- result.add(schemaName);
- }
- return result;
- }
-
- // TODO the invocation is disabled for now, it might be used again for next new feature
- protected final PipelineDataSourceWrapper getSourceCachedDataSource(final MigrationJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
- return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
+ log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
}
protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
@@ -121,18 +111,14 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
return PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
}
- protected final List<String> listCreateLogicalTableSQL(final PrepareTargetTablesParameter parameter) throws SQLException {
+ protected final String getCreateTargetTableSQL(final CreateTableEntry createTableEntry, final PipelineDataSourceManager dataSourceManager,
+ final ShardingSphereSQLParserEngine sqlParserEngine) throws SQLException {
+ DatabaseType databaseType = createTableEntry.getSourceDataSourceConfig().getDatabaseType();
+ DataSource sourceDataSource = dataSourceManager.getDataSource(createTableEntry.getSourceDataSourceConfig());
+ String schemaName = createTableEntry.getSourceName().getSchemaName().getOriginal();
+ String sourceTableName = createTableEntry.getSourceName().getTableName().getOriginal();
+ String targetTableName = createTableEntry.getTargetName().getTableName().getOriginal();
PipelineDDLGenerator generator = new PipelineDDLGenerator();
- List<String> result = new LinkedList<>();
- for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
- String dataSourceName = each.getDataNodes().get(0).getDataSourceName();
- DataSource dataSource = parameter.getSourceDataSourceMap().get(dataSourceName);
- DatabaseType databaseType = DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
- String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String actualTableName = each.getDataNodes().get(0).getTableName();
- Preconditions.checkNotNull(actualTableName, "Could not get actualTableName, nodeEntry={}", each);
- result.add(generator.generateLogicDDL(databaseType, dataSource, schemaName, each.getLogicTableName(), actualTableName, parameter.getSqlParserEngine()));
- }
- return result;
+ return generator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
index e4ef48ab384..501765108aa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
@@ -32,8 +32,9 @@ public interface DataSourcePreparer extends TypedSPI {
* Prepare target schemas.
*
* @param parameter prepare target schemas parameter
+ * @throws SQLException if prepare target schema fail
*/
- void prepareTargetSchemas(PrepareTargetSchemasParameter parameter);
+ void prepareTargetSchemas(PrepareTargetSchemasParameter parameter) throws SQLException;
/**
* Prepare target tables.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index df94812d018..6f70d1c8b49 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -19,13 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import java.util.List;
-
/**
* Prepare target schemas parameter.
*/
@@ -33,15 +30,9 @@ import java.util.List;
@Getter
public final class PrepareTargetSchemasParameter {
- private final List<String> logicTableNames;
-
private final DatabaseType targetDatabaseType;
- private final String databaseName;
-
- private final PipelineDataSourceConfiguration dataSourceConfig;
+ private final CreateTableConfiguration createTableConfig;
private final PipelineDataSourceManager dataSourceManager;
-
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index c9f1e8fc764..65fec5e55ef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -18,46 +18,21 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
-import lombok.NonNull;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import javax.sql.DataSource;
-import java.util.Map;
-
/**
* Prepare target tables parameter.
*/
+@RequiredArgsConstructor
@Getter
public final class PrepareTargetTablesParameter {
- private final String databaseName;
-
- private final JobDataNodeLine tablesFirstDataNodes;
-
- private final PipelineDataSourceConfiguration targetDataSourceConfig;
-
- private final Map<String, DataSource> sourceDataSourceMap;
+ private final CreateTableConfiguration createTableConfig;
private final PipelineDataSourceManager dataSourceManager;
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
-
private final ShardingSphereSQLParserEngine sqlParserEngine;
-
- public PrepareTargetTablesParameter(@NonNull final String databaseName, @NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
- @NonNull final Map<String, DataSource> sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
- @NonNull final JobDataNodeLine tablesFirstDataNodes, final TableNameSchemaNameMapping tableNameSchemaNameMapping,
- @NonNull final ShardingSphereSQLParserEngine sqlParserEngine) {
- this.databaseName = databaseName;
- this.targetDataSourceConfig = targetDataSourceConfig;
- this.sourceDataSourceMap = sourceDataSourceMap;
- this.tablesFirstDataNodes = tablesFirstDataNodes;
- this.dataSourceManager = dataSourceManager;
- this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
- this.sqlParserEngine = sqlParserEngine;
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 2dd261b8420..e1737a49eef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -68,7 +67,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
- TaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
+ MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
if (getTasksRunnerMap().containsKey(shardingItem)) {
log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 1a3d902efe2..383cf7e9581 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -37,6 +38,9 @@ public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI,
@Override
MigrationJobConfiguration getJobConfiguration(String jobId);
+ @Override
+ MigrationTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
+
@Override
MigrationProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index bd898dc5d9a..4be428d86ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -23,9 +23,10 @@ import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
@@ -48,6 +49,9 @@ import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
@@ -191,22 +195,32 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
}
@Override
- public TaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), new LogicTableName(jobConfig.getTargetTableName()));
Map<LogicTableName, String> tableNameSchemaMap = TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), Collections.singletonList(jobConfig.getTargetTableName()));
TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(tableNameSchemaMap);
- DumperConfiguration dumperConfig = createDumperConfiguration(jobConfig.getJobId(), jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, tableNameSchemaNameMapping);
+ CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig);
+ DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig.getJobId(), jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, tableNameSchemaNameMapping);
// TODO now shardingColumnsMap always empty,
- ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, pipelineProcessConfig, Collections.emptyMap(), tableNameSchemaNameMapping);
- TaskConfiguration result = new TaskConfiguration(dumperConfig, importerConfig);
- log.info("createTaskConfiguration, sourceResourceName={}, result={}", jobConfig.getSourceResourceName(), result);
+ ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, Collections.emptyMap(), tableNameSchemaNameMapping);
+ MigrationTaskConfiguration result = new MigrationTaskConfiguration(createTableConfig, dumperConfig, importerConfig);
+ log.info("buildTaskConfiguration, sourceResourceName={}, result={}", jobConfig.getSourceResourceName(), result);
return result;
}
- private static DumperConfiguration createDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
- final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig) {
+ String sourceSchemaName = jobConfig.getSourceSchemaName();
+ String targetSchemaName = DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : null;
+ CreateTableEntry createTableEntry = new CreateTableEntry(
+ jobConfig.getSource(), new SchemaTableName(new SchemaName(sourceSchemaName), new TableName(jobConfig.getSourceTableName())),
+ jobConfig.getTarget(), new SchemaTableName(new SchemaName(targetSchemaName), new TableName(jobConfig.getTargetTableName())));
+ return new CreateTableConfiguration(Collections.singletonList(createTableEntry));
+ }
+
+ private DumperConfiguration buildDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
+ final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
DumperConfiguration result = new DumperConfiguration();
result.setJobId(jobId);
result.setDataSourceName(dataSourceName);
@@ -216,18 +230,17 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
return result;
}
- private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
- final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
+ private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
+ final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
PipelineProcessContext migrationProcessContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, migrationProcessContext.getWriteRateLimitAlgorithm(),
+ return new ImporterConfiguration(jobConfig.getTarget(), unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, migrationProcessContext.getWriteRateLimitAlgorithm(),
retryTimes, concurrency);
}
- private static Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+ private Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
Map<LogicTableName, Set<String>> result = new HashMap<>(shardingColumnsMap.size());
for (Entry<LogicTableName, Set<String>> entry : shardingColumnsMap.entrySet()) {
result.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
@@ -343,10 +356,9 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
String targetTableName = jobConfig.getTargetTableName();
// TODO use jobConfig.targetSchemaName
String targetSchemaName = jobConfig.getSourceSchemaName();
- PipelineDataSourceConfiguration target = jobConfig.getTarget();
PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
try (
- PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
+ PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
String sql = pipelineSQLBuilder.buildTruncateSQL(targetSchemaName, targetTableName);
log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, targetTableName, sql);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 9a72f14c16c..2904932b9e0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -23,7 +23,6 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -56,7 +55,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
private final InventoryIncrementalJobItemProgress initProgress;
- private final TaskConfiguration taskConfig;
+ private final MigrationTaskConfiguration taskConfig;
private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
@@ -85,7 +84,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
};
public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final InventoryIncrementalJobItemProgress initProgress,
- final MigrationProcessContext jobProcessContext, final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
+ final MigrationProcessContext jobProcessContext, final MigrationTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index c02cd51ea0f..63a94777d93 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -18,11 +18,8 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -43,17 +40,11 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
-import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
/**
* Migration job preparer.
@@ -141,41 +132,27 @@ public final class MigrationJobPreparer {
private void prepareTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
- TableNameSchemaNameMapping tableNameSchemaNameMapping = jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
String targetDatabaseType = jobConfig.getTargetDatabaseType();
- if (isTargetSchemaAvailable(jobConfig) && StringUtils.isNotBlank(jobConfig.getSourceSchemaName())) {
- PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(Collections.singletonList(jobConfig.getTargetTableName()),
- DatabaseTypeFactory.getInstance(targetDatabaseType), jobConfig.getTargetDatabaseName(),
- jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
- PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParameter);
- }
- ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
- ShardingSphereDatabase sphereDatabase = metaData.getDatabases().get(jobConfig.getTargetDatabaseName());
- ShardingSphereSQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
- JobDataNodeLine jobDataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
- PipelineDataSourceWrapper dataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
- Map<String, DataSource> sourceDataSourceMap = new HashMap<>(1, 1.0F);
- sourceDataSourceMap.put(jobConfig.getSourceResourceName(), dataSource);
- PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getTargetDatabaseName(),
- jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), sourceDataSourceMap, jobItemContext.getDataSourceManager(),
- jobDataNodeLine, tableNameSchemaNameMapping, sqlParserEngine);
+ CreateTableConfiguration createTableConfig = jobItemContext.getTaskConfig().getCreateTableConfig();
+ PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(
+ DatabaseTypeFactory.getInstance(targetDatabaseType), createTableConfig, jobItemContext.getDataSourceManager());
+ PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParameter);
+ ShardingSphereSQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(jobConfig.getTargetDatabaseName());
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(createTableConfig, jobItemContext.getDataSourceManager(), sqlParserEngine);
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, prepareTargetTablesParameter);
}
- private boolean isTargetSchemaAvailable(final MigrationJobConfiguration jobConfig) {
- return DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()).isSchemaAvailable();
- }
-
private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
- InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
- jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
+ InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(
+ jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+ jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
}
private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) throws SQLException {
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
ExecuteEngine incrementalDumperExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
- TaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+ MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
JobItemIncrementalTasksProgress initIncremental = jobItemContext.getInitProgress() == null ? null : jobItemContext.getInitProgress().getIncremental();
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), dataSourceManager));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
similarity index 69%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 67c7187ce24..52672769aab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -15,20 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
/**
- * Task configuration.
+ * Migration task configuration.
*/
@Getter
@RequiredArgsConstructor
@ToString
-public final class TaskConfiguration {
+public final class MigrationTaskConfiguration implements PipelineTaskConfiguration {
+
+ private final CreateTableConfiguration createTableConfig;
private final DumperConfiguration dumperConfig;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index d68a2eb44f0..9608049f057 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
@@ -31,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
@@ -153,7 +153,7 @@ public final class MigrationJobAPIFixture implements MigrationJobAPI {
}
@Override
- public TaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
return null;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index ceccccf1742..9b7f0d2cde0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
@@ -33,14 +33,13 @@ import java.sql.SQLException;
public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
- public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
+ public void prepareTargetTables(final PrepareTargetTablesParameter parameter) throws SQLException {
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
- try (Connection targetConnection = getCachedDataSource(parameter.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
- for (String each : listCreateLogicalTableSQL(parameter)) {
- executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
+ for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
+ try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+ executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL));
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 0112176b321..7884a10dbb7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -83,7 +83,6 @@ public final class MySQLDataSourcePreparerTest {
when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
when(jobConfig.getTarget().getParameter()).thenReturn("target");
- when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
}
@Test
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index d58133902b0..78b6fef6559 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
import java.util.stream.Collectors;
/**
@@ -37,15 +37,14 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) throws SQLException {
- List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection = getCachedDataSource(parameter.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
- for (String createLogicalTableSQL : createLogicalTableSQLs) {
- for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
- executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
+ PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
+ for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
+ try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+ for (String sql : Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
+ executeTargetTableSQL(targetConnection, sql);
}
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index 8f75e968183..aa30e1c380c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
import java.util.stream.Collectors;
/**
@@ -37,15 +37,14 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) throws SQLException {
- List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection = getCachedDataSource(parameter.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
- for (String createLogicalTableSQL : createLogicalTableSQLs) {
- for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
- executeTargetTableSQL(targetConnection, each);
+ PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
+ for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
+ try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+ for (String sql : Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
+ executeTargetTableSQL(targetConnection, sql);
}
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f64fde18e31..9141e76717b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api.impl;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -34,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.BeforeClass;
@@ -127,13 +127,13 @@ public final class GovernanceRepositoryAPIImplTest {
private MigrationJobItemContext mockJobItemContext() {
MigrationJobItemContext result = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
- TaskConfiguration taskConfig = result.getTaskConfig();
+ MigrationTaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
return result;
}
- private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) {
+ private InventoryTask mockInventoryTask(final MigrationTaskConfiguration taskConfig) {
InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
dumperConfig.setPosition(new PlaceholderPosition());
dumperConfig.setActualTableName("t_order");
@@ -147,7 +147,7 @@ public final class GovernanceRepositoryAPIImplTest {
new DefaultPipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
}
- private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
+ private IncrementalTask mockIncrementalTask(final MigrationTaskConfiguration taskConfig) {
DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
dumperConfig.setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 27f35e6442b..55337499309 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.prepare;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -27,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -45,7 +45,7 @@ public final class InventoryTaskSplitterTest {
private MigrationJobItemContext jobItemContext;
- private TaskConfiguration taskConfig;
+ private MigrationTaskConfiguration taskConfig;
private PipelineDataSourceManager dataSourceManager;
@@ -59,8 +59,9 @@ public final class InventoryTaskSplitterTest {
@Before
public void setUp() {
initJobItemContext();
- inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
- jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
+ inventoryTaskSplitter = new InventoryTaskSplitter(
+ jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+ jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
}
private void initJobItemContext() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index c0097e72f04..076766b8a6a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.task;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -26,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPr
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public final class IncrementalTaskTest {
@Before
public void setUp() {
- TaskConfiguration taskConfig = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
+ MigrationTaskConfiguration taskConfig = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 03096ef1cad..7f9f60caec0 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.task;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -30,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public final class InventoryTaskTest {
private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new DefaultPipelineDataSourceManager();
- private TaskConfiguration taskConfig;
+ private MigrationTaskConfiguration taskConfig;
@BeforeClass
public static void beforeClass() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 56409e6fbd1..d98bc93950c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -32,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.Memory
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -159,7 +159,7 @@ public final class PipelineContextUtil {
PipelineProcessConfiguration processConfig = mockPipelineProcessConfiguration();
MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
- TaskConfiguration taskConfig = new MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
+ MigrationTaskConfiguration taskConfig = new MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
return new MigrationJobItemContext(jobConfig, jobShardingItem, null, processContext, taskConfig, new DefaultPipelineDataSourceManager());
}