You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/20 13:32:58 UTC

[shardingsphere] branch master updated: Fix multi unique key migration ignore inventory task when the first type is number (#24263)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 575d72ca783 Fix multi unique key migration ignore inventory task when the first type is number (#24263)
575d72ca783 is described below

commit 575d72ca78386882e41b487e997b0f8f4d0c7a33
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Feb 20 21:32:48 2023 +0800

    Fix multi unique key migration ignore inventory task when the first type is number (#24263)
    
    * Fix multi unique key migration ignore inventory task when the first is number type.
    
    * Fix ci
    
    * Merge method
---
 .../ingest/position/PrimaryKeyPositionFactory.java |  2 +-
 .../core/ingest/dumper/InventoryDumper.java        |  9 +++-----
 .../primarykey/IndexesMigrationE2EIT.java          | 25 ++++++++++++++--------
 3 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index cbbf3cd6faf..6e8bbabf9bb 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -63,7 +63,7 @@ public final class PrimaryKeyPositionFactory {
      */
     public static IngestPosition<?> newInstance(final Object beginValue, final Object endValue) {
         if (beginValue instanceof Number) {
-            return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), ((Number) endValue).longValue());
+            return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), null != endValue ? ((Number) endValue).longValue() : Long.MAX_VALUE);
         }
         if (beginValue instanceof CharSequence) {
             return new StringPrimaryKeyPosition(beginValue.toString(), null != endValue ? endValue.toString() : null);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 08f43c64f5b..395dc244917 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -140,12 +140,9 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         if (!dumperConfig.hasUniqueKey()) {
             return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName, dumperConfig.getActualTableName());
         }
+        PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
         PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
-        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
-            return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
-        }
-        if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
-            PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
+        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
             if (null != position.getBeginValue() && null != position.getEndValue()) {
                 return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
             }
@@ -162,7 +159,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         }
         PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
         PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
-        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
+        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
             preparedStatement.setObject(1, position.getBeginValue());
             preparedStatement.setObject(2, position.getEndValue());
             return;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index a1324bfb6a7..9750328847a 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
@@ -33,6 +34,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -84,7 +86,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
     }
     
     @Test
@@ -97,7 +99,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
     }
     
     @Test
@@ -105,12 +107,12 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         String sql;
         String consistencyCheckAlgorithmType;
         if (getDatabaseType() instanceof MySQLDatabaseType) {
-            sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+            sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             consistencyCheckAlgorithmType = "DATA_MATCH";
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new SnowflakeKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
     }
     
     @Test
@@ -124,14 +126,13 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
     }
     
-    private void assertMigrationSuccess(final String sqlPattern, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
+    private void assertMigrationSuccess(final String sqlPattern, final KeyGenerateAlgorithm generateAlgorithm, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
         initEnvironment(getDatabaseType(), new MigrationJobType());
         sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
         try (Connection connection = getSourceDataSource().getConnection()) {
-            KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
             PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
@@ -141,8 +142,14 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         String jobId = listJobId().get(0);
         waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
-        assertProxyOrderRecordExist("t_order", "a1");
+        Comparable<?> primaryKey = generateAlgorithm.generateKey();
+        try (PreparedStatement preparedStatement = getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+            preparedStatement.setObject(1, primaryKey);
+            preparedStatement.setObject(2, 1);
+            preparedStatement.setObject(3, "OK");
+            preparedStatement.execute();
+        }
+        assertProxyOrderRecordExist("t_order", primaryKey);
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
         commitMigrationByJobId(jobId);