You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/21 02:59:55 UTC

[shardingsphere] branch master updated: Add inventory records count at progress (#21664)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 453f2ffd5f3 Add inventory records count at progress (#21664)
453f2ffd5f3 is described below

commit 453f2ffd5f3af9c1ec446dc4ccb28371daf62801
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Oct 21 10:59:48 2022 +0800

    Add inventory records count at progress (#21664)
---
 .../InventoryIncrementalJobItemProgress.java       |  2 ++
 .../impl/InventoryIncrementalJobItemAPIImpl.java   |  1 +
 .../InventoryIncrementalJobItemContext.java        | 14 ++++++++++++++
 .../YamlInventoryIncrementalJobItemProgress.java   |  2 ++
 ...InventoryIncrementalJobItemProgressSwapper.java |  2 ++
 .../core/prepare/InventoryTaskSplitter.java        | 22 ++++++++++++++++++++--
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  2 +-
 .../migration/MigrationJobItemContext.java         |  8 ++++++++
 8 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index e529214631f..9cc4284a747 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -42,5 +42,7 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
     
     private long processedRecordsCount;
     
+    private long inventoryRecordsCount;
+    
     private String errorMessage;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index e1750c66025..01eda47a28c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -56,6 +56,7 @@ public final class InventoryIncrementalJobItemAPIImpl implements PipelineJobItem
         jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
         jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
         jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
+        jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
         String value = YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 046aa1d254b..104d4e12a52 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -76,4 +76,18 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
      * @return processed record count.
      */
     long getProcessedRecordsCount();
+    
+    /**
+     * Init inventory records count.
+     *
+     * @param recordsCount records count
+     */
+    void initInventoryRecordsCount(long recordsCount);
+    
+    /**
+     * Get inventory records count.
+     *
+     * @return inventory records count
+     */
+    long getInventoryRecordsCount();
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index 956a90506a6..d9936fff379 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -39,4 +39,6 @@ public final class YamlInventoryIncrementalJobItemProgress implements YamlConfig
     private YamlJobItemIncrementalTasksProgress incremental;
     
     private long processedRecordsCount;
+    
+    private long inventoryRecordsCount;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 1e31d99d7c7..3dbb18a6077 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -39,6 +39,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         result.setInventory(inventoryTasksProgressSwapper.swapToYaml(progress.getInventory()));
         result.setIncremental(incrementalTasksProgressSwapper.swapToYaml(progress.getIncremental()));
         result.setProcessedRecordsCount(progress.getProcessedRecordsCount());
+        result.setInventoryRecordsCount(progress.getInventoryRecordsCount());
         return result;
     }
     
@@ -51,6 +52,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
         result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()));
         result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
+        result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
         return result;
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 9cfcd247ca2..40c81189147 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -149,7 +149,7 @@ public final class InventoryTaskSplitter {
             return getPositionByIntegerPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
         }
         if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
-            return getPositionByStringPrimaryKeyRange();
+            return getPositionByStringPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
         }
         throw new SplitPipelineJobByRangeException(dumperConfig.getActualTableName(), "primary key is not integer or string type");
     }
@@ -166,6 +166,7 @@ public final class InventoryTaskSplitter {
                 PreparedStatement ps = connection.prepareStatement(sql)) {
             // TODO query minimum value less than 0
             long beginId = 0;
+            long recordsCount = 0;
             for (int i = 0; i < Integer.MAX_VALUE; i++) {
                 ps.setLong(1, beginId);
                 ps.setLong(2, shardingSize);
@@ -175,6 +176,7 @@ public final class InventoryTaskSplitter {
                         break;
                     }
                     long endId = rs.getLong(1);
+                    recordsCount += rs.getLong(2);
                     if (0 == endId) {
                         log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
                         break;
@@ -183,6 +185,7 @@ public final class InventoryTaskSplitter {
                     beginId = endId + 1;
                 }
             }
+            jobItemContext.initInventoryRecordsCount(recordsCount);
             // fix empty table missing inventory task
             if (result.isEmpty()) {
                 result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -193,7 +196,22 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange() {
+    private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+                                                                             final InventoryDumperConfiguration dumperConfig) {
+        PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
+        String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+        String actualTableName = dumperConfig.getActualTableName();
+        String sql = PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement ps = connection.prepareStatement(sql)) {
+            try (ResultSet rs = ps.executeQuery()) {
+                rs.next();
+                jobItemContext.initInventoryRecordsCount(rs.getLong(1));
+            }
+        } catch (final SQLException ex) {
+            throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), ex);
+        }
         Collection<IngestPosition<?>> result = new LinkedList<>();
         result.add(new StringPrimaryKeyPosition("!", "~"));
         return result;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index d65408f499f..75bd659b63c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -181,7 +181,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     @Override
     public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String primaryKey) {
         String quotedUniqueKey = quote(primaryKey);
-        return String.format("SELECT MAX(%s) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t",
+        return String.format("SELECT MAX(%s),COUNT(*) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t",
                 quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c5a5f2301ba..adcdd6761d1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -66,6 +66,8 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     
     private final AtomicLong processedRecordsCount = new AtomicLong(0);
     
+    private volatile long inventoryRecordsCount;
+    
     private final MigrationJobConfiguration jobConfig;
     
     private final MigrationProcessContext jobProcessContext;
@@ -97,6 +99,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
         this.initProgress = initProgress;
         if (null != initProgress) {
             processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+            inventoryRecordsCount = initProgress.getInventoryRecordsCount();
         }
         this.jobProcessContext = jobProcessContext;
         this.taskConfig = taskConfig;
@@ -138,4 +141,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     public long getProcessedRecordsCount() {
         return processedRecordsCount.get();
     }
+    
+    @Override
+    public void initInventoryRecordsCount(final long recordsCount) {
+        inventoryRecordsCount = recordsCount;
+    }
 }