You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/21 02:59:55 UTC
[shardingsphere] branch master updated: Add inventory records count at progress (#21664)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 453f2ffd5f3 Add inventory records count at progress (#21664)
453f2ffd5f3 is described below
commit 453f2ffd5f3af9c1ec446dc4ccb28371daf62801
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Oct 21 10:59:48 2022 +0800
Add inventory records count at progress (#21664)
---
.../InventoryIncrementalJobItemProgress.java | 2 ++
.../impl/InventoryIncrementalJobItemAPIImpl.java | 1 +
.../InventoryIncrementalJobItemContext.java | 14 ++++++++++++++
.../YamlInventoryIncrementalJobItemProgress.java | 2 ++
...InventoryIncrementalJobItemProgressSwapper.java | 2 ++
.../core/prepare/InventoryTaskSplitter.java | 22 ++++++++++++++++++++--
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 2 +-
.../migration/MigrationJobItemContext.java | 8 ++++++++
8 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index e529214631f..9cc4284a747 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -42,5 +42,7 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
private long processedRecordsCount;
+ private long inventoryRecordsCount;
+
private String errorMessage;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index e1750c66025..01eda47a28c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -56,6 +56,7 @@ public final class InventoryIncrementalJobItemAPIImpl implements PipelineJobItem
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
+ jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
String value = YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 046aa1d254b..104d4e12a52 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -76,4 +76,18 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
* @return processed record count.
*/
long getProcessedRecordsCount();
+
+ /**
+ * Init inventory records count.
+ *
+ * @param recordsCount records count
+ */
+ void initInventoryRecordsCount(long recordsCount);
+
+ /**
+ * Get inventory records count.
+ *
+ * @return inventory records count
+ */
+ long getInventoryRecordsCount();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index 956a90506a6..d9936fff379 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -39,4 +39,6 @@ public final class YamlInventoryIncrementalJobItemProgress implements YamlConfig
private YamlJobItemIncrementalTasksProgress incremental;
private long processedRecordsCount;
+
+ private long inventoryRecordsCount;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 1e31d99d7c7..3dbb18a6077 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -39,6 +39,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setInventory(inventoryTasksProgressSwapper.swapToYaml(progress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToYaml(progress.getIncremental()));
result.setProcessedRecordsCount(progress.getProcessedRecordsCount());
+ result.setInventoryRecordsCount(progress.getInventoryRecordsCount());
return result;
}
@@ -51,6 +52,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()));
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
+ result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
return result;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 9cfcd247ca2..40c81189147 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -149,7 +149,7 @@ public final class InventoryTaskSplitter {
return getPositionByIntegerPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
}
if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
- return getPositionByStringPrimaryKeyRange();
+ return getPositionByStringPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
}
throw new SplitPipelineJobByRangeException(dumperConfig.getActualTableName(), "primary key is not integer or string type");
}
@@ -166,6 +166,7 @@ public final class InventoryTaskSplitter {
PreparedStatement ps = connection.prepareStatement(sql)) {
// TODO query minimum value less than 0
long beginId = 0;
+ long recordsCount = 0;
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ps.setLong(1, beginId);
ps.setLong(2, shardingSize);
@@ -175,6 +176,7 @@ public final class InventoryTaskSplitter {
break;
}
long endId = rs.getLong(1);
+ recordsCount += rs.getLong(2);
if (0 == endId) {
log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
break;
@@ -183,6 +185,7 @@ public final class InventoryTaskSplitter {
beginId = endId + 1;
}
}
+ jobItemContext.initInventoryRecordsCount(recordsCount);
// fix empty table missing inventory task
if (result.isEmpty()) {
result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -193,7 +196,22 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange() {
+ private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+ final InventoryDumperConfiguration dumperConfig) {
+ PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
+ String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+ String actualTableName = dumperConfig.getActualTableName();
+ String sql = PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement ps = connection.prepareStatement(sql)) {
+ try (ResultSet rs = ps.executeQuery()) {
+ rs.next();
+ jobItemContext.initInventoryRecordsCount(rs.getLong(1));
+ }
+ } catch (final SQLException ex) {
+ throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), ex);
+ }
Collection<IngestPosition<?>> result = new LinkedList<>();
result.add(new StringPrimaryKeyPosition("!", "~"));
return result;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index d65408f499f..75bd659b63c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -181,7 +181,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
@Override
public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String primaryKey) {
String quotedUniqueKey = quote(primaryKey);
- return String.format("SELECT MAX(%s) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t",
+ return String.format("SELECT MAX(%s),COUNT(*) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t",
quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c5a5f2301ba..adcdd6761d1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -66,6 +66,8 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
private final AtomicLong processedRecordsCount = new AtomicLong(0);
+ private volatile long inventoryRecordsCount;
+
private final MigrationJobConfiguration jobConfig;
private final MigrationProcessContext jobProcessContext;
@@ -97,6 +99,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
this.initProgress = initProgress;
if (null != initProgress) {
processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+ inventoryRecordsCount = initProgress.getInventoryRecordsCount();
}
this.jobProcessContext = jobProcessContext;
this.taskConfig = taskConfig;
@@ -138,4 +141,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
public long getProcessedRecordsCount() {
return processedRecordsCount.get();
}
+
+ @Override
+ public void initInventoryRecordsCount(final long recordsCount) {
+ inventoryRecordsCount = recordsCount;
+ }
}