You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/13 12:22:36 UTC
[shardingsphere] branch master updated: Support unique key table check migration (#20944)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc83e6e0d97 Support unique key table check migration (#20944)
bc83e6e0d97 is described below
commit bc83e6e0d97c13abdb449c3b992be7fd8d6abaf6
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 20:22:28 2022 +0800
Support unique key table check migration (#20944)
* Support unique key table check migration
* persist unique key in job configuration
* Fix ci error
* Fix codestyle
* Fix codestyle
---
.../api/config/job/MigrationJobConfiguration.java | 3 +
.../job/yaml/YamlMigrationJobConfiguration.java | 3 +
.../yaml/YamlMigrationJobConfigurationSwapper.java | 6 +-
.../metadata/yaml/YamlPipelineColumnMetaData.java | 40 +++++++
.../yaml/YamlPipelineColumnMetaDataSwapper.java | 51 ++++++++
.../core/prepare/InventoryTaskSplitter.java | 66 ++---------
.../core/util/PipelineTableMetaDataUtil.java | 129 +++++++++++++++++++++
.../migration/MigrationDataConsistencyChecker.java | 6 +-
.../scenario/migration/MigrationJobAPIImpl.java | 6 +
.../scenario/migration/MigrationJobPreparer.java | 8 +-
.../cases/migration/AbstractMigrationITCase.java | 4 +-
.../migration/general/MySQLMigrationGeneralIT.java | 4 +-
.../general/PostgreSQLMigrationGeneralIT.java | 4 +-
.../primarykey/TextPrimaryKeyMigrationIT.java | 13 ++-
.../env/scenario/primary_key/unique_key/mysql.xml | 28 +++++
.../core/api/impl/MigrationJobAPIImplTest.java | 27 ++++-
.../core/prepare/InventoryTaskSplitterTest.java | 29 ++++-
17 files changed, 350 insertions(+), 77 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
index 57be24501b7..cb19ff443e6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import java.util.List;
@@ -67,6 +68,8 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
private final List<String> jobShardingDataNodes;
+ private final PipelineColumnMetaData uniqueKeyColumn;
+
private final int concurrency;
private final int retryTimes;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
index 917b9f9367e..34d13bc8d97 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
@@ -23,6 +23,7 @@ import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
import java.util.List;
@@ -66,6 +67,8 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
private List<String> jobShardingDataNodes;
+ private YamlPipelineColumnMetaData uniqueKeyColumn;
+
private int concurrency = 3;
private int retryTimes = 3;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
index 93a1ed0edf8..1fd2e5bf86b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
@@ -32,6 +33,8 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
+ private final YamlPipelineColumnMetaDataSwapper pipelineColumnMetaDataSwapper = new YamlPipelineColumnMetaDataSwapper();
+
@Override
public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
@@ -47,6 +50,7 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
result.setJobShardingDataNodes(data.getJobShardingDataNodes());
+ result.setUniqueKeyColumn(pipelineColumnMetaDataSwapper.swapToYamlConfiguration(data.getUniqueKeyColumn()));
result.setConcurrency(data.getConcurrency());
result.setRetryTimes(data.getRetryTimes());
return result;
@@ -59,7 +63,7 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
yamlConfig.getSourceTableName(), yamlConfig.getTargetTableName(),
dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
- yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(),
+ yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(), pipelineColumnMetaDataSwapper.swapToObject(yamlConfig.getUniqueKeyColumn()),
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
new file mode 100644
index 00000000000..260e7cf4786
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.metadata.yaml;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+/**
+ * Yaml pipeline column meta data.
+ */
+@Data
+public final class YamlPipelineColumnMetaData implements YamlConfiguration {
+
+ private int ordinalPosition;
+
+ private String name;
+
+ private int dataType;
+
+ private String dataTypeName;
+
+ private boolean nullable;
+
+ private boolean primaryKey;
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
new file mode 100644
index 00000000000..8d6064c0684
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.metadata.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml pipeline column meta data swapper.
+ */
+public final class YamlPipelineColumnMetaDataSwapper implements YamlConfigurationSwapper<YamlPipelineColumnMetaData, PipelineColumnMetaData> {
+
+ @Override
+ public YamlPipelineColumnMetaData swapToYamlConfiguration(final PipelineColumnMetaData data) {
+ if (null == data) {
+ return null;
+ }
+ YamlPipelineColumnMetaData result = new YamlPipelineColumnMetaData();
+ result.setName(data.getName());
+ result.setDataType(data.getDataType());
+ result.setDataTypeName(data.getDataTypeName());
+ result.setNullable(data.isNullable());
+ result.setPrimaryKey(data.isPrimaryKey());
+ result.setOrdinalPosition(data.getOrdinalPosition());
+ return result;
+ }
+
+ @Override
+ public PipelineColumnMetaData swapToObject(final YamlPipelineColumnMetaData yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
+ return new PipelineColumnMetaData(yamlConfig.getOrdinalPosition(), yamlConfig.getName(), yamlConfig.getDataType(), yamlConfig.getDataTypeName(), yamlConfig.isNullable(),
+ yamlConfig.isPrimaryKey());
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 04856d1f356..7cc8a52f8ec 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -28,15 +27,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
@@ -68,7 +63,7 @@ public final class InventoryTaskSplitter {
private final PipelineDataSourceWrapper sourceDataSource;
- private final DumperConfiguration dumperConfig;
+ private final InventoryDumperConfiguration dumperConfig;
private final ImporterConfiguration importerConfig;
@@ -97,15 +92,15 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<InventoryDumperConfiguration> splitDumperConfig(final InventoryIncrementalJobItemContext jobItemContext, final DumperConfiguration dumperConfig) {
+ private Collection<InventoryDumperConfiguration> splitDumperConfig(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
- result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource, metaDataLoader));
+ result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource));
}
return result;
}
- private Collection<InventoryDumperConfiguration> splitByTable(final DumperConfiguration dumperConfig) {
+ private Collection<InventoryDumperConfiguration> splitByTable(final InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
dumperConfig.getTableNameMap().forEach((key, value) -> {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
@@ -113,19 +108,21 @@ public final class InventoryTaskSplitter {
inventoryDumperConfig.setActualTableName(key.getOriginal());
inventoryDumperConfig.setLogicTableName(value.getOriginal());
inventoryDumperConfig.setPosition(new PlaceholderPosition());
+ inventoryDumperConfig.setUniqueKey(dumperConfig.getUniqueKey());
+ inventoryDumperConfig.setUniqueKeyDataType(dumperConfig.getUniqueKeyDataType());
result.add(inventoryDumperConfig);
});
return result;
}
private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
- final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+ final DataSource dataSource) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
InventoryIncrementalProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
int batchSize = readConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
- Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
+ Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource);
int i = 0;
for (IngestPosition<?> inventoryPosition : inventoryPositions) {
InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
@@ -143,58 +140,19 @@ public final class InventoryTaskSplitter {
}
private Collection<IngestPosition<?>> getInventoryPositions(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig,
- final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
- String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
- String actualTableName = dumperConfig.getActualTableName();
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, actualTableName);
- PipelineColumnMetaData uniqueKeyColumn = mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
+ final DataSource dataSource) {
if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) {
- Collection<IngestPosition<?>> result = initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
- for (IngestPosition<?> each : result) {
- if (each instanceof PrimaryKeyPosition) {
- dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
- dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
- break;
- }
- }
// Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
- return result;
+ return initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
}
- dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
- int uniqueKeyDataType = uniqueKeyColumn.getDataType();
- dumperConfig.setUniqueKeyDataType(uniqueKeyDataType);
+ int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
return getPositionByIntegerPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
} else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
return getPositionByStringPrimaryKeyRange();
} else {
- throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is not integer or string type", actualTableName));
- }
- }
-
- private PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) {
- if (null == tableMetaData) {
- throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: can not get table metadata ", tableName));
- }
- List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
- if (primaryKeys.size() > 1) {
- throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is union primary", tableName));
- }
- if (1 == primaryKeys.size()) {
- return tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
- }
- Collection<PipelineIndexMetaData> uniqueIndexes = tableMetaData.getUniqueIndexes();
- if (uniqueIndexes.isEmpty()) {
- throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key or unique index", tableName));
- }
- if (1 == uniqueIndexes.size() && 1 == uniqueIndexes.iterator().next().getColumns().size()) {
- PipelineColumnMetaData column = uniqueIndexes.iterator().next().getColumns().get(0);
- if (!column.isNullable()) {
- return column;
- }
+ throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is not integer or string type", dumperConfig.getActualTableName()));
}
- throw new PipelineJobCreationException(
- String.format("Can not split range for table %s, reason: table contains multiple unique index or unique index contains nullable/multiple column(s)", tableName));
}
private Collection<IngestPosition<?>> getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
new file mode 100644
index 00000000000..f6d04b192e5
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Pipeline table meta data util.
+ */
+public final class PipelineTableMetaDataUtil {
+
+ /**
+ * Get pipeline table meta data.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param dataSourceConfig source configuration
+ * @param loader pipeline table meta data loader* @return pipeline table meta data
+ * @return pipeline table meta data
+ */
+ @SneakyThrows(SQLException.class)
+ public static PipelineTableMetaData getPipelineTableMetaData(final String schemaName, final String tableName, final StandardPipelineDataSourceConfiguration dataSourceConfig,
+ final PipelineTableMetaDataLoader loader) {
+ try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
+ return getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
+ }
+ }
+
+ /**
+ * Get pipeline table meta data.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param dataSource data source
+ * @param loader pipeline table meta data loader
+ * @return pipeline table meta data.
+ */
+ public static PipelineTableMetaData getPipelineTableMetaData(final String schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
+ final PipelineTableMetaDataLoader loader) {
+ if (null == loader) {
+ return new StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(schemaName, tableName);
+ } else {
+ return loader.getTableMetaData(schemaName, tableName);
+ }
+ }
+
+ /**
+ * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param dataSourceConfig data source config
+ * @param loader pipeline table meta data loader
+ * @return pipeline column meta data.
+ */
+ public static PipelineColumnMetaData getUniqueKeyColumn(final String schemaName, final String tableName, final StandardPipelineDataSourceConfiguration dataSourceConfig,
+ final StandardPipelineTableMetaDataLoader loader) {
+ PipelineTableMetaData pipelineTableMetaData = getPipelineTableMetaData(schemaName, tableName, dataSourceConfig, loader);
+ return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, tableName);
+ }
+
+ /**
+ * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param dataSource data source
+ * @param loader pipeline table meta data loader
+ * @return pipeline column meta data.
+ */
+ public static PipelineColumnMetaData getUniqueKeyColumn(final String schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
+ final StandardPipelineTableMetaDataLoader loader) {
+ PipelineTableMetaData pipelineTableMetaData = getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
+ return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, tableName);
+ }
+
+ private static PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) {
+ if (null == tableMetaData) {
+ throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: can not get table metadata ", tableName));
+ }
+ List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
+ if (primaryKeys.size() > 1) {
+ throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is union primary", tableName));
+ }
+ if (1 == primaryKeys.size()) {
+ return tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+ }
+ Collection<PipelineIndexMetaData> uniqueIndexes = tableMetaData.getUniqueIndexes();
+ if (uniqueIndexes.isEmpty()) {
+ throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key or unique index", tableName));
+ }
+ if (1 == uniqueIndexes.size() && 1 == uniqueIndexes.iterator().next().getColumns().size()) {
+ PipelineColumnMetaData column = uniqueIndexes.iterator().next().getColumns().get(0);
+ if (!column.isNullable()) {
+ return column;
+ }
+ }
+ throw new PipelineJobCreationException(
+ String.format("Can not split range for table %s, reason: table contains multiple unique index or unique index contains nullable/multiple column(s)", tableName));
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index bbd61d295e6..c6061c1eaf4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -32,8 +32,8 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -174,12 +174,12 @@ public final class MigrationDataConsistencyChecker {
String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
for (String each : Collections.singletonList(sourceTableName)) {
- PipelineTableMetaData tableMetaData = new StandardPipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
+ PipelineTableMetaData tableMetaData = PipelineTableMetaDataUtil.getPipelineTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each, sourceDataSource, null);
if (null == tableMetaData) {
throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
}
Collection<String> columnNames = tableMetaData.getColumnNames();
- PipelineColumnMetaData uniqueKey = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+ PipelineColumnMetaData uniqueKey = jobConfig.getUniqueKeyColumn();
DataConsistencyCalculateParameter sourceParameter = buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
DataConsistencyCalculateParameter targetParameter = buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName, columnNames, targetDatabaseType, sourceDatabaseType,
uniqueKey);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 2fa0d392489..e68b75c7a1f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -51,6 +51,8 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
@@ -67,6 +69,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigr
import org.apache.shardingsphere.data.pipeline.core.exception.connection.DropMigrationSourceResourceException;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -465,6 +468,9 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
result.setTargetDatabaseName(targetDatabaseName);
result.setTargetTableName(parameter.getTargetTableName());
+ YamlPipelineColumnMetaData uniqueKeyColumn = new YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName,
+ parameter.getSourceTableName(), sourceDataSourceConfig, null));
+ result.setUniqueKeyColumn(uniqueKeyColumn);
extendYamlJobConfiguration(result);
MigrationJobConfiguration jobConfiguration = new YamlMigrationJobConfigurationSwapper().swapToObject(result);
start(jobConfiguration);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index e17a19ff197..99693e5e20a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -26,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
@@ -143,8 +145,12 @@ public final class MigrationJobPreparer {
}
private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
+ InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+ PipelineColumnMetaData uniqueKeyColumn = jobItemContext.getJobConfig().getUniqueKeyColumn();
+ inventoryDumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+ inventoryDumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(
- jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+ jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 05d525474fe..fcdd0f9fd72 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -159,7 +159,7 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
}
- protected void assertCheckMigrationSuccess(final String jobId) {
+ protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
for (int i = 0; i < 5; i++) {
if (checkJobIncrementTaskFinished(jobId)) {
break;
@@ -168,7 +168,7 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
}
boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
log.info("second check job result: {}", secondCheckJobResult);
- List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='DATA_MATCH')", jobId));
+ List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
log.info("check job results: {}", checkJobResults);
for (Map<String, Object> entry : checkJobResults) {
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 3cfcd013495..7c3889a7827 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -42,7 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
/**
- * General scaling test case, includes multiple cases.
+ * General migration test case, includes multiple cases.
*/
@Slf4j
@RunWith(Parameterized.class)
@@ -103,7 +103,7 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId);
+ assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 81b5bed2ba9..e181a40838c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -111,7 +111,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
sourceExecuteWithLog(String.format("INSERT INTO %s.t_order_copy (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, KEY_GENERATE_ALGORITHM.generateKey(),
1, "afterStop"));
startMigrationByJobId(jobId);
- assertCheckMigrationSuccess(jobId);
+ assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
@@ -119,7 +119,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
startMigrationOrderItem(true);
String jobId = getJobIdByTableName("t_order_item");
waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId);
+ assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index d9d681514d2..d180deae6a8 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.integration.data.pipeline.cases.migration.Abstr
import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.integration.env.container.atomic.util.DatabaseTypeUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -38,7 +39,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class)
@@ -58,6 +59,7 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
}
for (String version : ENV.listDatabaseDockerImageNames(new MySQLDatabaseType())) {
result.add(new ScalingParameterized(new MySQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
+ result.add(new ScalingParameterized(new MySQLDatabaseType(), version, "env/scenario/primary_key/unique_key/mysql.xml"));
}
for (String version : ENV.listDatabaseDockerImageNames(new PostgreSQLDatabaseType())) {
result.add(new ScalingParameterized(new PostgreSQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
@@ -79,8 +81,14 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
startMigrationOrder();
String jobId = listJobId().get(0);
waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ sourceExecuteWithLog(String.format("INSERT INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, "afterStop"));
+ // TODO The ordering of primary or unique keys for text types is different, may cause check failed, need fix
+ if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
+ assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
+ } else {
+ assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+ }
stopMigrationByJobId(jobId);
- assertCheckMigrationSuccess(jobId);
if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
commitMigrationByJobId(jobId);
List<String> lastJobIds = listJobId();
@@ -100,5 +108,6 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
}
preparedStatement.executeBatch();
}
+ log.info("init data succeed");
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
new file mode 100644
index 00000000000..5f8a288d79a
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
@@ -0,0 +1,28 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<command>
+ <create-table-order>
+ CREATE TABLE `t_order` (
+ `order_id` varchar(255) NOT NULL,
+ `user_id` INT NOT NULL,
+ `status` varchar(255) NULL,
+ `t_unsigned_int` int UNSIGNED NULL,
+ CONSTRAINT unique_id UNIQUE (order_id),
+ INDEX ( `user_id` )
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+ </create-table-order>
+</command>
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 1b17ff77540..edd701f1189 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -35,9 +36,11 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.AfterClass;
@@ -77,8 +80,7 @@ public final class MigrationJobAPIImplTest {
PipelineContextUtil.mockModeConfigAndContextManager();
jobAPI = MigrationJobAPIFactory.getInstance();
Map<String, Object> props = new HashMap<>();
- // TODO if resource availability is checked, then it should not work
- props.put("jdbcUrl", "jdbc:mysql://localhost:3306/test");
+ props.put("jdbcUrl", "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
props.put("username", "root");
props.put("password", "root");
Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
@@ -151,8 +153,10 @@ public final class MigrationJobAPIImplTest {
}
@Test
- public void assertDataConsistencyCheck() {
- Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ public void assertDataConsistencyCheck() throws NoSuchFieldException, IllegalAccessException {
+ MigrationJobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
+ ReflectionUtil.setFieldValue(jobConfiguration, "uniqueKeyColumn", new PipelineColumnMetaData(1, "order_id", 4, "", false, true));
+ Optional<String> jobId = jobAPI.start(jobConfiguration);
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
if (null == jobConfig.getSource()) {
@@ -266,7 +270,8 @@ public final class MigrationJobAPIImplTest {
}
@Test
- public void assertCreateJobConfig() {
+ public void assertCreateJobConfig() throws SQLException {
+ initIntPrimaryEnvironment();
CreateMigrationJobParameter parameter = new CreateMigrationJobParameter("ds_0", null, "t_order", "logic_db", "t_order");
String jobId = jobAPI.createJobAndStart(parameter);
MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId);
@@ -276,6 +281,18 @@ public final class MigrationJobAPIImplTest {
assertThat(jobConfig.getTargetTableName(), is("t_order"));
}
+ private void initIntPrimaryEnvironment() throws SQLException {
+ Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(JobType.MIGRATION);
+ DataSourceProperties dataSourceProperties = metaDataDataSource.get("ds_0");
+ DataSource dataSource = DataSourcePoolCreator.create(dataSourceProperties);
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE IF EXISTS t_order");
+ statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id int(10))");
+ }
+ }
+
@Test
public void assertShowMigrationSourceResources() {
Collection<Collection<Object>> actual = jobAPI.listMigrationSourceResources();
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 28a1b00a4af..06d4a52e159 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -18,13 +18,18 @@
package org.apache.shardingsphere.data.pipeline.core.prepare;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.After;
@@ -36,9 +41,11 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
public final class InventoryTaskSplitterTest {
@@ -57,15 +64,19 @@ public final class InventoryTaskSplitterTest {
}
@Before
- public void setUp() {
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
initJobItemContext();
+ InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+ dumperConfig.setUniqueKeyDataType(Types.INTEGER);
+ dumperConfig.setUniqueKey("order_id");
inventoryTaskSplitter = new InventoryTaskSplitter(
- jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+ jobItemContext.getSourceDataSource(), dumperConfig, jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
}
- private void initJobItemContext() {
+ private void initJobItemContext() throws NoSuchFieldException, IllegalAccessException {
MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ ReflectionUtil.setFieldValue(jobConfig, "uniqueKeyColumn", new PipelineColumnMetaData(1, "order_id", 4, "", false, true));
jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
dataSourceManager = jobItemContext.getDataSourceManager();
taskConfig = jobItemContext.getTaskConfig();
@@ -110,14 +121,22 @@ public final class InventoryTaskSplitterTest {
}
@Test(expected = PipelineJobCreationException.class)
- public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
+ public void assertSplitInventoryDataWithIllegalKeyDataType() throws SQLException, NoSuchFieldException, IllegalAccessException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
+ InventoryDumperConfiguration dumperConfig = ReflectionUtil.getFieldValue(inventoryTaskSplitter, "dumperConfig", InventoryDumperConfiguration.class);
+ assertNotNull(dumperConfig);
+ dumperConfig.setUniqueKey("order_id,user_id");
+ dumperConfig.setUniqueKeyDataType(Integer.MIN_VALUE);
inventoryTaskSplitter.splitInventoryData(jobItemContext);
}
@Test(expected = PipelineJobCreationException.class)
- public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException {
+ public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException, NoSuchFieldException, IllegalAccessException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
+ try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig())) {
+ PipelineColumnMetaData uniqueKeyColumn = PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", dataSource, null);
+ ReflectionUtil.setFieldValue(jobItemContext.getJobConfig(), "uniqueKeyColumn", uniqueKeyColumn);
+ }
inventoryTaskSplitter.splitInventoryData(jobItemContext);
}