You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/20 13:32:58 UTC
[shardingsphere] branch master updated: Fix multi unique key migration ignore inventory task when the first type is number (#24263)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 575d72ca783 Fix multi unique key migration ignore inventory task when the first type is number (#24263)
575d72ca783 is described below
commit 575d72ca78386882e41b487e997b0f8f4d0c7a33
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Feb 20 21:32:48 2023 +0800
Fix multi unique key migration ignore inventory task when the first type is number (#24263)
* Fix multi unique key migration ignore inventory task when the first is number type.
* Fix ci
* Merge method
---
.../ingest/position/PrimaryKeyPositionFactory.java | 2 +-
.../core/ingest/dumper/InventoryDumper.java | 9 +++-----
.../primarykey/IndexesMigrationE2EIT.java | 25 ++++++++++++++--------
3 files changed, 20 insertions(+), 16 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index cbbf3cd6faf..6e8bbabf9bb 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -63,7 +63,7 @@ public final class PrimaryKeyPositionFactory {
*/
public static IngestPosition<?> newInstance(final Object beginValue, final Object endValue) {
if (beginValue instanceof Number) {
- return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), ((Number) endValue).longValue());
+ return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), null != endValue ? ((Number) endValue).longValue() : Long.MAX_VALUE);
}
if (beginValue instanceof CharSequence) {
return new StringPrimaryKeyPosition(beginValue.toString(), null != endValue ? endValue.toString() : null);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 08f43c64f5b..395dc244917 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -140,12 +140,9 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
if (!dumperConfig.hasUniqueKey()) {
return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName, dumperConfig.getActualTableName());
}
+ PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
- if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
- return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
- }
- if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
- PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
+ if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != position.getBeginValue() && null != position.getEndValue()) {
return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
}
@@ -162,7 +159,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
}
PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
- if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
+ if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
return;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index a1324bfb6a7..9750328847a 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
@@ -33,6 +34,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
@@ -84,7 +86,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
}
@Test
@@ -97,7 +99,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
}
@Test
@@ -105,12 +107,12 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
String sql;
String consistencyCheckAlgorithmType;
if (getDatabaseType() instanceof MySQLDatabaseType) {
- sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
consistencyCheckAlgorithmType = "DATA_MATCH";
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new SnowflakeKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
}
@Test
@@ -124,14 +126,13 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
}
- private void assertMigrationSuccess(final String sqlPattern, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
+ private void assertMigrationSuccess(final String sqlPattern, final KeyGenerateAlgorithm generateAlgorithm, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
initEnvironment(getDatabaseType(), new MigrationJobType());
sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
try (Connection connection = getSourceDataSource().getConnection()) {
- KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
@@ -141,8 +142,14 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
- assertProxyOrderRecordExist("t_order", "a1");
+ Comparable<?> primaryKey = generateAlgorithm.generateKey();
+ try (PreparedStatement preparedStatement = getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+ preparedStatement.setObject(1, primaryKey);
+ preparedStatement.setObject(2, 1);
+ preparedStatement.setObject(3, "OK");
+ preparedStatement.execute();
+ }
+ assertProxyOrderRecordExist("t_order", primaryKey);
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
commitMigrationByJobId(jobId);