You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/22 04:22:58 UTC

[shardingsphere] branch master updated: Revise inventory records count and improve init incremental task progress (#21689)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 520cc64cc8c Revise inventory records count and improve init incremental task progress (#21689)
520cc64cc8c is described below

commit 520cc64cc8c2e880e911df9694b169227695c384
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Sat Oct 22 12:22:48 2022 +0800

    Revise inventory records count and improve init incremental task progress (#21689)
---
 .../InventoryIncrementalJobItemContext.java        |   4 +-
 .../core/prepare/InventoryTaskSplitter.java        |   4 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  13 ++-
 .../migration/MigrationJobItemContext.java         |  13 ++-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |   6 +-
 .../FixtureInventoryIncrementalJobItemContext.java | 120 +++++++++++++++++++++
 .../FixturePipelineJobProgressListener.java        |  28 -----
 .../core/importer/DefaultImporterTest.java         |   4 +-
 .../core/importer/ImporterCreatorFactoryTest.java  |   6 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |   4 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |   6 +-
 11 files changed, 155 insertions(+), 53 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 104d4e12a52..bc18c086eb8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -78,11 +78,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
     long getProcessedRecordsCount();
     
     /**
-     * Init inventory records count.
+     * Update inventory records count.
      *
      * @param recordsCount records count
      */
-    void initInventoryRecordsCount(long recordsCount);
+    void updateInventoryRecordsCount(long recordsCount);
     
     /**
      * Get inventory records count.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 40c81189147..e17251f5040 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -185,7 +185,7 @@ public final class InventoryTaskSplitter {
                     beginId = endId + 1;
                 }
             }
-            jobItemContext.initInventoryRecordsCount(recordsCount);
+            jobItemContext.updateInventoryRecordsCount(recordsCount);
             // fix empty table missing inventory task
             if (result.isEmpty()) {
                 result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -207,7 +207,7 @@ public final class InventoryTaskSplitter {
                 PreparedStatement ps = connection.prepareStatement(sql)) {
             try (ResultSet rs = ps.executeQuery()) {
                 rs.next();
-                jobItemContext.initInventoryRecordsCount(rs.getLong(1));
+                jobItemContext.updateInventoryRecordsCount(rs.getLong(1));
             }
         } catch (final SQLException ex) {
             throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), ex);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 8b435383e69..98de46f4a8a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -29,9 +29,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
@@ -66,20 +68,23 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
                            final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
                            final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalExecuteEngine,
-                           final PipelineJobProgressListener jobProgressListener) {
+                           final InventoryIncrementalJobItemContext jobItemContext) {
         taskId = dumperConfig.getDataSourceName();
         this.incrementalExecuteEngine = incrementalExecuteEngine;
         IngestPosition<?> position = dumperConfig.getPosition();
-        taskProgress = createIncrementalTaskProgress(position);
+        taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
         channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
         dumper = IncrementalDumperCreatorFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, channel,
                 sourceMetaDataLoader);
-        importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, jobProgressListener);
+        importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, jobItemContext);
     }
     
-    private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position) {
+    private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position, final InventoryIncrementalJobItemProgress jobItemProgress) {
         IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
         incrementalTaskProgress.setPosition(position);
+        if (null != jobItemProgress) {
+            incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay());
+        }
         return incrementalTaskProgress;
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index adcdd6761d1..53b6e94236b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -66,7 +66,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     
     private final AtomicLong processedRecordsCount = new AtomicLong(0);
     
-    private volatile long inventoryRecordsCount;
+    private final AtomicLong inventoryRecordsCount = new AtomicLong(0);
     
     private final MigrationJobConfiguration jobConfig;
     
@@ -99,7 +99,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
         this.initProgress = initProgress;
         if (null != initProgress) {
             processedRecordsCount.set(initProgress.getProcessedRecordsCount());
-            inventoryRecordsCount = initProgress.getInventoryRecordsCount();
+            inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
         }
         this.jobProcessContext = jobProcessContext;
         this.taskConfig = taskConfig;
@@ -143,7 +143,12 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     }
     
     @Override
-    public void initInventoryRecordsCount(final long recordsCount) {
-        inventoryRecordsCount = recordsCount;
+    public void updateInventoryRecordsCount(final long recordsCount) {
+        inventoryRecordsCount.addAndGet(recordsCount);
+    }
+    
+    @Override
+    public long getInventoryRecordsCount() {
+        return inventoryRecordsCount.get();
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index c6f6d088a41..f743bddf34f 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -151,7 +151,7 @@ public final class GovernanceRepositoryAPIImplTest {
         PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(),
-                dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
+                dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
     }
     
     private IncrementalTask mockIncrementalTask(final MigrationTaskConfiguration taskConfig) {
@@ -159,6 +159,6 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(),
-                metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
+                metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
new file mode 100644
index 00000000000..ed58fe4951e
--- /dev/null
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+
+public final class FixtureInventoryIncrementalJobItemContext implements InventoryIncrementalJobItemContext {
+    
+    @Override
+    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
+    }
+    
+    @Override
+    public InventoryIncrementalProcessContext getJobProcessContext() {
+        return null;
+    }
+    
+    @Override
+    public Collection<InventoryTask> getInventoryTasks() {
+        return null;
+    }
+    
+    @Override
+    public Collection<IncrementalTask> getIncrementalTasks() {
+        return null;
+    }
+    
+    @Override
+    public InventoryIncrementalJobItemProgress getInitProgress() {
+        return null;
+    }
+    
+    @Override
+    public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
+        return null;
+    }
+    
+    @Override
+    public PipelineDataSourceManager getDataSourceManager() {
+        return null;
+    }
+    
+    @Override
+    public long getProcessedRecordsCount() {
+        return 0;
+    }
+    
+    @Override
+    public void updateInventoryRecordsCount(final long recordsCount) {
+    }
+    
+    @Override
+    public long getInventoryRecordsCount() {
+        return 0;
+    }
+    
+    @Override
+    public String getJobId() {
+        return null;
+    }
+    
+    @Override
+    public int getShardingItem() {
+        return 0;
+    }
+    
+    @Override
+    public String getDataSourceName() {
+        return null;
+    }
+    
+    @Override
+    public JobStatus getStatus() {
+        return null;
+    }
+    
+    @Override
+    public void setStatus(final JobStatus status) {
+    }
+    
+    @Override
+    public PipelineJobConfiguration getJobConfig() {
+        return null;
+    }
+    
+    @Override
+    public void setStopping(final boolean stopping) {
+    }
+    
+    @Override
+    public boolean isStopping() {
+        return false;
+    }
+}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
deleted file mode 100644
index e70bcbf50d6..00000000000
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.fixture;
-
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-
-public final class FixturePipelineJobProgressListener implements PipelineJobProgressListener {
-    
-    @Override
-    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
-    }
-}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index c81823ee2a8..fef64e346f0 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,7 +82,7 @@ public final class DefaultImporterTest {
     
     @Before
     public void setUp() throws SQLException {
-        jdbcImporter = new DefaultImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobProgressListener());
+        jdbcImporter = new DefaultImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixtureInventoryIncrementalJobItemContext());
         when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
         when(dataSource.getConnection()).thenReturn(connection);
     }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
index 4dcb7fe1393..9287c6d9875 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.junit.Test;
@@ -52,14 +52,14 @@ public final class ImporterCreatorFactoryTest {
     @Test
     public void assertCreateImporter() {
         for (String each : Arrays.asList("MySQL", "PostgreSQL", "openGauss")) {
-            Importer actual = ImporterCreatorFactory.getInstance(each).createImporter(createImporterConfiguration(each), dataSourceManager, channel, new FixturePipelineJobProgressListener());
+            Importer actual = ImporterCreatorFactory.getInstance(each).createImporter(createImporterConfiguration(each), dataSourceManager, channel, new FixtureInventoryIncrementalJobItemContext());
             assertThat(actual, instanceOf(DefaultImporter.class));
         }
     }
     
     @Test
     public void assertCreateImporterForH2() {
-        Importer actual = ImporterCreatorFactory.getInstance("H2").createImporter(createImporterConfiguration("H2"), dataSourceManager, channel, new FixturePipelineJobProgressListener());
+        Importer actual = ImporterCreatorFactory.getInstance("H2").createImporter(createImporterConfiguration("H2"), dataSourceManager, channel, new FixtureInventoryIncrementalJobItemContext());
         assertThat(actual, instanceOf(FixtureImporter.class));
     }
     
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index a14a62d02af..f9448af70c5 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -56,7 +56,7 @@ public final class IncrementalTaskTest {
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(), metaDataLoader,
-                PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
+                PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
     }
     
     @Test
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 3a2772be8eb..0b277d23713 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -74,7 +74,7 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(), DATA_SOURCE_MANAGER, dataSource,
-                        metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
+                        metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
             inventoryTask.start().get(10, TimeUnit.SECONDS);
         }
     }
@@ -89,7 +89,7 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(), new DefaultPipelineDataSourceManager(), dataSource,
-                        metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
+                        metaDataLoader, PipelineContextUtil.getExecuteEngine(), PipelineContextUtil.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
             inventoryTask.start().get(10, TimeUnit.SECONDS);
             assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
         }