You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/22 04:22:58 UTC
[shardingsphere] branch master updated: Revise inventory records count and improve init incremental task progress (#21689)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 520cc64cc8c Revise inventory records count and improve init incremental task progress (#21689)
520cc64cc8c is described below
commit 520cc64cc8c2e880e911df9694b169227695c384
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Sat Oct 22 12:22:48 2022 +0800
Revise inventory records count and improve init incremental task progress (#21689)
---
.../InventoryIncrementalJobItemContext.java | 4 +-
.../core/prepare/InventoryTaskSplitter.java | 4 +-
.../data/pipeline/core/task/IncrementalTask.java | 13 ++-
.../migration/MigrationJobItemContext.java | 13 ++-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 6 +-
.../FixtureInventoryIncrementalJobItemContext.java | 120 +++++++++++++++++++++
.../FixturePipelineJobProgressListener.java | 28 -----
.../core/importer/DefaultImporterTest.java | 4 +-
.../core/importer/ImporterCreatorFactoryTest.java | 6 +-
.../pipeline/core/task/IncrementalTaskTest.java | 4 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 6 +-
11 files changed, 155 insertions(+), 53 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 104d4e12a52..bc18c086eb8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -78,11 +78,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
long getProcessedRecordsCount();
/**
- * Init inventory records count.
+ * Update inventory records count.
*
* @param recordsCount records count
*/
- void initInventoryRecordsCount(long recordsCount);
+ void updateInventoryRecordsCount(long recordsCount);
/**
* Get inventory records count.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 40c81189147..e17251f5040 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -185,7 +185,7 @@ public final class InventoryTaskSplitter {
beginId = endId + 1;
}
}
- jobItemContext.initInventoryRecordsCount(recordsCount);
+ jobItemContext.updateInventoryRecordsCount(recordsCount);
// fix empty table missing inventory task
if (result.isEmpty()) {
result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -207,7 +207,7 @@ public final class InventoryTaskSplitter {
PreparedStatement ps = connection.prepareStatement(sql)) {
try (ResultSet rs = ps.executeQuery()) {
rs.next();
- jobItemContext.initInventoryRecordsCount(rs.getLong(1));
+ jobItemContext.updateInventoryRecordsCount(rs.getLong(1));
}
} catch (final SQLException ex) {
throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), ex);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 8b435383e69..98de46f4a8a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -29,9 +29,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
@@ -66,20 +68,23 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalExecuteEngine,
- final PipelineJobProgressListener jobProgressListener) {
+ final InventoryIncrementalJobItemContext jobItemContext) {
taskId = dumperConfig.getDataSourceName();
this.incrementalExecuteEngine = incrementalExecuteEngine;
IngestPosition<?> position = dumperConfig.getPosition();
- taskProgress = createIncrementalTaskProgress(position);
+ taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
dumper = IncrementalDumperCreatorFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, channel,
sourceMetaDataLoader);
- importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, jobProgressListener);
+ importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, jobItemContext);
}
- private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position) {
+ private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position, final InventoryIncrementalJobItemProgress jobItemProgress) {
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
incrementalTaskProgress.setPosition(position);
+ if (null != jobItemProgress) {
+ incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay());
+ }
return incrementalTaskProgress;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index adcdd6761d1..53b6e94236b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -66,7 +66,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
private final AtomicLong processedRecordsCount = new AtomicLong(0);
- private volatile long inventoryRecordsCount;
+ private final AtomicLong inventoryRecordsCount = new AtomicLong(0);
private final MigrationJobConfiguration jobConfig;
@@ -99,7 +99,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
this.initProgress = initProgress;
if (null != initProgress) {
processedRecordsCount.set(initProgress.getProcessedRecordsCount());
- inventoryRecordsCount = initProgress.getInventoryRecordsCount();
+ inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
}
this.jobProcessContext = jobProcessContext;
this.taskConfig = taskConfig;
@@ -143,7 +143,12 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
}
@Override
- public void initInventoryRecordsCount(final long recordsCount) {
- inventoryRecordsCount = recordsCount;
+ public void updateInventoryRecordsCount(final long recordsCount) {
+ inventoryRecordsCount.addAndGet(recordsCount);
+ }
+
+ @Override
+ public long getInventoryRecordsCount() {
+ return inventoryRecordsCount.get();
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index c6f6d088a41..f743bddf34f 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -151,7 +151,7 @@ public final class GovernanceRepositoryAPIImplTest {
PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(),
- dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
+ dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
}
private IncrementalTask mockIncrementalTask(final MigrationTaskConfiguration taskConfig) {
@@ -159,6 +159,6 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(),
- metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
+ metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
new file mode 100644
index 00000000000..ed58fe4951e
--- /dev/null
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+
+public final class FixtureInventoryIncrementalJobItemContext implements InventoryIncrementalJobItemContext {
+
+ @Override
+ public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
+ }
+
+ @Override
+ public InventoryIncrementalProcessContext getJobProcessContext() {
+ return null;
+ }
+
+ @Override
+ public Collection<InventoryTask> getInventoryTasks() {
+ return null;
+ }
+
+ @Override
+ public Collection<IncrementalTask> getIncrementalTasks() {
+ return null;
+ }
+
+ @Override
+ public InventoryIncrementalJobItemProgress getInitProgress() {
+ return null;
+ }
+
+ @Override
+ public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
+ return null;
+ }
+
+ @Override
+ public PipelineDataSourceManager getDataSourceManager() {
+ return null;
+ }
+
+ @Override
+ public long getProcessedRecordsCount() {
+ return 0;
+ }
+
+ @Override
+ public void updateInventoryRecordsCount(final long recordsCount) {
+ }
+
+ @Override
+ public long getInventoryRecordsCount() {
+ return 0;
+ }
+
+ @Override
+ public String getJobId() {
+ return null;
+ }
+
+ @Override
+ public int getShardingItem() {
+ return 0;
+ }
+
+ @Override
+ public String getDataSourceName() {
+ return null;
+ }
+
+ @Override
+ public JobStatus getStatus() {
+ return null;
+ }
+
+ @Override
+ public void setStatus(final JobStatus status) {
+ }
+
+ @Override
+ public PipelineJobConfiguration getJobConfig() {
+ return null;
+ }
+
+ @Override
+ public void setStopping(final boolean stopping) {
+ }
+
+ @Override
+ public boolean isStopping() {
+ return false;
+ }
+}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
deleted file mode 100644
index e70bcbf50d6..00000000000
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.fixture;
-
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-
-public final class FixturePipelineJobProgressListener implements PipelineJobProgressListener {
-
- @Override
- public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
- }
-}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index c81823ee2a8..fef64e346f0 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.junit.Before;
import org.junit.Test;
@@ -82,7 +82,7 @@ public final class DefaultImporterTest {
@Before
public void setUp() throws SQLException {
- jdbcImporter = new DefaultImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobProgressListener());
+ jdbcImporter = new DefaultImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixtureInventoryIncrementalJobItemContext());
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
when(dataSource.getConnection()).thenReturn(connection);
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
index 4dcb7fe1393..9287c6d9875 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.junit.Test;
@@ -52,14 +52,14 @@ public final class ImporterCreatorFactoryTest {
@Test
public void assertCreateImporter() {
for (String each : Arrays.asList("MySQL", "PostgreSQL", "openGauss")) {
- Importer actual = ImporterCreatorFactory.getInstance(each).createImporter(createImporterConfiguration(each), dataSourceManager, channel, new FixturePipelineJobProgressListener());
+ Importer actual = ImporterCreatorFactory.getInstance(each).createImporter(createImporterConfiguration(each), dataSourceManager, channel, new FixtureInventoryIncrementalJobItemContext());
assertThat(actual, instanceOf(DefaultImporter.class));
}
}
@Test
public void assertCreateImporterForH2() {
- Importer actual = ImporterCreatorFactory.getInstance("H2").createImporter(createImporterConfiguration("H2"), dataSourceManager, channel, new FixturePipelineJobProgressListener());
+ Importer actual = ImporterCreatorFactory.getInstance("H2").createImporter(createImporterConfiguration("H2"), dataSourceManager, channel, new FixtureInventoryIncrementalJobItemContext());
assertThat(actual, instanceOf(FixtureImporter.class));
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index a14a62d02af..f9448af70c5 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -56,7 +56,7 @@ public final class IncrementalTaskTest {
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(), metaDataLoader,
- PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
+ PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
}
@Test
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 3a2772be8eb..0b277d23713 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -74,7 +74,7 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(), DATA_SOURCE_MANAGER, dataSource,
- metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
+ metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
inventoryTask.start().get(10, TimeUnit.SECONDS);
}
}
@@ -89,7 +89,7 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(), dataSource,
- metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
+ metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
inventoryTask.start().get(10, TimeUnit.SECONDS);
assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
}