You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/16 12:18:17 UTC
[shardingsphere] branch master updated: Show migration status add processed_records_count column (#21019)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 902cff301f0 Show migration status add processed_records_count column (#21019)
902cff301f0 is described below
commit 902cff301f0a6ea122a33dae0616c906e27f7193
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Sep 16 20:18:00 2022 +0800
Show migration status add processed_records_count column (#21019)
* Delete listener
* Show migration status add processed records column
* Improve name
* Fix ci
---
.../ShowMigrationJobStatusQueryResultSet.java | 4 ++-
.../InventoryIncrementalJobItemProgress.java | 2 ++
.../listener/PipelineJobProgressListener.java | 4 ++-
...va => PipelineJobProgressUpdatedParameter.java} | 16 +++++----
.../impl/InventoryIncrementalJobItemAPIImpl.java | 1 +
.../InventoryIncrementalJobItemContext.java | 10 +++++-
.../pipeline/core/importer/DefaultImporter.java | 18 ++++++----
.../DefaultPipelineJobProgressListener.java | 38 ----------------------
.../YamlInventoryIncrementalJobItemProgress.java | 2 ++
...InventoryIncrementalJobItemProgressSwapper.java | 2 ++
.../core/prepare/InventoryTaskSplitter.java | 4 +--
.../migration/MigrationJobItemContext.java | 20 ++++++++++++
.../scenario/migration/MigrationJobPreparer.java | 4 +--
.../data/pipeline/cases/base/BaseITCase.java | 6 ++--
.../migration/general/MySQLMigrationGeneralIT.java | 7 +++-
.../FixturePipelineJobProgressListener.java | 3 +-
16 files changed, 78 insertions(+), 63 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 4256fc45793..78eb9d3f80d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -51,6 +51,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
result.add(entry.getValue().getDataSourceName());
result.add(entry.getValue().getStatus());
result.add(entry.getValue().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
+ result.add(entry.getValue().getProcessedRecordsCount());
result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
long latestActiveTimeMillis = entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis();
result.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0);
@@ -60,6 +61,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
result.add("");
result.add("");
result.add("");
+ result.add("");
}
return result;
}).collect(Collectors.toList()).iterator();
@@ -67,7 +69,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_seconds");
+ return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds");
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index 4f22da7dfaf..7d08abc8788 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -39,4 +39,6 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
private JobItemInventoryTasksProgress inventory;
private JobItemIncrementalTasksProgress incremental;
+
+ private long processedRecordsCount;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
index 4ca18491ef8..b8fadbde343 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
@@ -24,6 +24,8 @@ public interface PipelineJobProgressListener {
/**
* Emit on progress updated.
+ *
+ * @param parameter process update parameter
*/
- void onProgressUpdated();
+ void onProgressUpdated(PipelineJobProgressUpdatedParameter parameter);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
similarity index 75%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
index 4ca18491ef8..1a21a14bb7e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
@@ -17,13 +17,17 @@
package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
/**
- * Pipeline job progress listener.
+ * Pipeline job process update parameter.
*/
-public interface PipelineJobProgressListener {
+@RequiredArgsConstructor
+@Getter
+public final class PipelineJobProgressUpdatedParameter {
+
+ private final int insertedRecordsCount;
- /**
- * Emit on progress updated.
- */
- void onProgressUpdated();
+ private final int deletedRecordsCount;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index ec1302c76df..766fdc24245 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -55,6 +55,7 @@ public final class InventoryIncrementalJobItemAPIImpl implements PipelineJobItem
jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
+ jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
String value = YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 36a153ff08d..7d6a930d646 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.context;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -26,7 +27,7 @@ import java.util.Collection;
/**
* Inventory incremental job item context.
*/
-public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext {
+public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener {
@Override
InventoryIncrementalProcessContext getJobProcessContext();
@@ -44,4 +45,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
* @return incremental tasks
*/
Collection<IncrementalTask> getIncrementalTasks();
+
+ /**
+ * Get processed record count.
+ *
+ * @return processed record count.
+ */
+ long getProcessedRecordsCount();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 558f73aabd8..d5b34f86e22 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
@@ -97,9 +98,9 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
if (null != records && !records.isEmpty()) {
round++;
rowCount += records.size();
- flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
+ PipelineJobProgressUpdatedParameter updatedParameter = flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
channel.ack(records);
- jobProgressListener.onProgressUpdated();
+ jobProgressListener.onProgressUpdated(updatedParameter);
if (0 == round % 50) {
log.info("importer write, round={}, rowCount={}", round, rowCount);
}
@@ -113,13 +114,18 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
log.info("importer write done, rowCount={}, finishedByBreak={}", rowCount, finishedByBreak);
}
- private void flush(final DataSource dataSource, final List<Record> buffer) {
- List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
- groupedDataRecords.forEach(each -> {
+ private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
+ List<GroupedDataRecord> result = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+ int insertRecordNumber = 0;
+ int deleteRecordNumber = 0;
+ for (GroupedDataRecord each : result) {
+ deleteRecordNumber += null != each.getDeleteDataRecords() ? each.getDeleteDataRecords().size() : 0;
flushInternal(dataSource, each.getDeleteDataRecords());
+ insertRecordNumber += null != each.getInsertDataRecords() ? each.getInsertDataRecords().size() : 0;
flushInternal(dataSource, each.getInsertDataRecords());
flushInternal(dataSource, each.getUpdateDataRecords());
- });
+ }
+ return new PipelineJobProgressUpdatedParameter(insertRecordNumber, deleteRecordNumber);
}
private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
deleted file mode 100644
index 97b55c4e48a..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-
-/**
- * Default pipeline job progress listener implementation.
- */
-@RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements PipelineJobProgressListener {
-
- private final String jobId;
-
- private final int shardingItem;
-
- @Override
- public void onProgressUpdated() {
- PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index de052fe82ef..956a90506a6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -37,4 +37,6 @@ public final class YamlInventoryIncrementalJobItemProgress implements YamlConfig
private YamlJobItemInventoryTasksProgress inventory;
private YamlJobItemIncrementalTasksProgress incremental;
+
+ private long processedRecordsCount;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 6624e8bd750..a7e89170ef3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -38,6 +38,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setDataSourceName(progress.getDataSourceName());
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToYaml(progress.getInventory()));
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToYaml(progress.getIncremental()));
+ result.setProcessedRecordsCount(progress.getProcessedRecordsCount());
return result;
}
@@ -49,6 +50,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setDataSourceName(yamlProgress.getDataSourceName());
result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToObject(yamlProgress.getInventory()));
result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()));
+ result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
return result;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index e116ec9e290..23ce1729373 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -37,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -84,10 +83,9 @@ public final class InventoryTaskSplitter {
public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) {
List<InventoryTask> result = new LinkedList<>();
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
- DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobItemContext.getJobId(), jobItemContext.getShardingItem());
for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader, importerExecuteEngine,
- jobProgressListener));
+ jobItemContext));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index ca19765cf46..80240a5bd97 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -28,14 +28,17 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Migration job item context.
@@ -63,6 +66,8 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
+ private final AtomicLong processedRecordsCount = new AtomicLong(0);
+
private final MigrationJobConfiguration jobConfig;
private final MigrationProcessContext jobProcessContext;
@@ -92,6 +97,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
this.shardingItem = shardingItem;
this.dataSourceName = taskConfig.getDataSourceName();
this.initProgress = initProgress;
+ if (null != initProgress) {
+ processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+ }
this.jobProcessContext = jobProcessContext;
this.taskConfig = taskConfig;
this.dataSourceManager = dataSourceManager;
@@ -125,4 +133,16 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
public boolean isSourceTargetDatabaseTheSame() {
return jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
}
+
+ @Override
+ public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
+ int needAddNumber = parameter.getInsertedRecordsCount() - parameter.getDeletedRecordsCount();
+ processedRecordsCount.addAndGet(needAddNumber);
+ PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+ }
+
+ @Override
+ public long getProcessedRecordsCount() {
+ return processedRecordsCount.get();
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index daa6ace0229..48071fee418 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -32,7 +32,6 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
@@ -163,9 +162,8 @@ public final class MigrationJobPreparer {
JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
- DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobItemContext.getJobId(), jobItemContext.getShardingItem());
IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
- taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, jobProgressListener);
+ taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, jobItemContext);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 75cb988b2e7..2105107d595 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -51,6 +51,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -264,7 +265,7 @@ public abstract class BaseITCase {
getIncreaseTaskThread().start();
}
- protected void waitJobFinished(final String distSQL) throws InterruptedException {
+ protected List<Map<String, Object>> waitJobFinished(final String distSQL) throws InterruptedException {
if (null != getIncreaseTaskThread()) {
TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
}
@@ -276,12 +277,13 @@ public abstract class BaseITCase {
assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
- break;
+ return listJobStatus;
} else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
log.warn("one of the shardingItem was not started correctly");
}
ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
+ return Collections.emptyList();
}
protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) throws SQLException {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 2a9610924d8..5f6c60c08f5 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -38,9 +38,11 @@ import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* General migration test case, includes multiple cases.
@@ -105,7 +107,10 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
}
private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ List<Map<String, Object>> jobStatus = waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ for (Map<String, Object> each : jobStatus) {
+ assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
+ }
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
index 8a2a7a11702..e70bcbf50d6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
@@ -17,11 +17,12 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
public final class FixturePipelineJobProgressListener implements PipelineJobProgressListener {
@Override
- public void onProgressUpdated() {
+ public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
}
}