You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/05/14 04:19:27 UTC

[shardingsphere] branch master updated: Fix sonar issue of IngestPosition (#25652)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac71877cbc Fix sonar issue of IngestPosition (#25652)
0ac71877cbc is described below

commit 0ac71877cbc6268a72d8759fbcad3758f80949d4
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun May 14 12:19:20 2023 +0800

    Fix sonar issue of IngestPosition (#25652)
    
    * Fix sonar issue of IngestPosition
    
    * Fix sonar issue of IngestPosition
    
    * Fix sonar issue of TestDecodingPlugin
    
    * Fix sonar issue of TrimFunctionConverter
    
    * Fix sonar issue of InventoryDumper
---
 .../EspressoInlineExpressionParserTest.java        |  5 ++--
 .../api/config/ingest/DumperConfiguration.java     |  2 +-
 .../api/ingest/position/IngestPosition.java        |  4 +--
 .../pipeline/api/ingest/record/DataRecord.java     |  2 +-
 .../pipeline/api/ingest/record/FinishedRecord.java |  2 +-
 .../api/ingest/record/PlaceholderRecord.java       |  2 +-
 .../data/pipeline/api/ingest/record/Record.java    |  2 +-
 .../ingest/dumper/IncrementalDumperCreator.java    |  6 ++---
 .../spi/ingest/position/PositionInitializer.java   |  4 +--
 .../api/ingest/position/FinishedPosition.java      |  2 +-
 .../ingest/position/IntegerPrimaryKeyPosition.java |  2 +-
 .../api/ingest/position/NoUniqueKeyPosition.java   |  2 +-
 .../api/ingest/position/PlaceholderPosition.java   |  2 +-
 .../ingest/position/PrimaryKeyPositionFactory.java |  4 +--
 .../ingest/position/StringPrimaryKeyPosition.java  |  2 +-
 .../ingest/position/UnsupportedKeyPosition.java    |  2 +-
 .../progress/JobItemIncrementalTasksProgress.java  |  2 +-
 .../progress/JobItemInventoryTasksProgress.java    |  8 +++---
 .../api/task/progress/IncrementalTaskProgress.java |  8 +++---
 .../api/task/progress/InventoryTaskProgress.java   |  2 +-
 .../core/ingest/dumper/InventoryDumper.java        | 13 +++++-----
 .../YamlJobItemInventoryTasksProgressSwapper.java  |  4 +--
 .../core/prepare/InventoryTaskSplitter.java        | 29 +++++++++++-----------
 .../core/prepare/PipelineJobPreparerUtils.java     |  6 ++---
 .../data/pipeline/core/task/IncrementalTask.java   |  4 +--
 .../data/pipeline/core/task/InventoryTask.java     |  2 +-
 .../InventoryIncrementalJobItemProgressTest.java   |  4 +--
 .../memory/MultiplexMemoryPipelineChannelTest.java |  2 +-
 .../mysql/MySQLIncrementalDumperCreator.java       |  5 ++--
 .../mysql/ingest/MySQLIncrementalDumper.java       |  2 +-
 .../mysql/ingest/binlog/BinlogPosition.java        |  2 +-
 .../OpenGaussIncrementalDumperCreator.java         |  5 ++--
 .../opengauss/ingest/OpenGaussWALDumper.java       |  2 +-
 .../PostgreSQLIncrementalDumperCreator.java        |  5 ++--
 .../postgresql/ingest/PostgreSQLWALDumper.java     |  2 +-
 .../postgresql/ingest/wal/WALPosition.java         |  2 +-
 .../ingest/wal/decode/TestDecodingPlugin.java      |  2 +-
 .../expression/impl/TrimFunctionConverter.java     |  4 +--
 .../fixture/FixtureIncrementalDumperCreator.java   |  5 ++--
 39 files changed, 77 insertions(+), 88 deletions(-)

diff --git a/infra/expr/espresso/src/test/java/org/apache/shardingsphere/infra/expr/espresso/EspressoInlineExpressionParserTest.java b/infra/expr/espresso/src/test/java/org/apache/shardingsphere/infra/expr/espresso/EspressoInlineExpressionParserTest.java
index b043b68b01e..b389c5a2f2d 100644
--- a/infra/expr/espresso/src/test/java/org/apache/shardingsphere/infra/expr/espresso/EspressoInlineExpressionParserTest.java
+++ b/infra/expr/espresso/src/test/java/org/apache/shardingsphere/infra/expr/espresso/EspressoInlineExpressionParserTest.java
@@ -119,9 +119,8 @@ class EspressoInlineExpressionParserTest {
     }
     
     /*
-     * TODO
-     * This method needs to avoid returning a groovy.lang.Closure class instance, and instead return the result of `Closure#call`.
-     * Because `org.graalvm.polyglot.Value#as` does not allow this type to be returned from the guest JVM.
+     * TODO This method needs to avoid returning a groovy.lang.Closure class instance, and instead return the result of `Closure#call`. Because `org.graalvm.polyglot.Value#as` does not allow this type
+     * to be returned from the guest JVM.
      */
     @Test
     @Disabled("See java doc")
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 8cb49bfd012..37c2033f6e2 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -50,7 +50,7 @@ public class DumperConfiguration {
     
     private PipelineDataSourceConfiguration dataSourceConfig;
     
-    private IngestPosition<?> position;
+    private IngestPosition position;
     
     private Map<ActualTableName, LogicTableName> tableNameMap;
     
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IngestPosition.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IngestPosition.java
index 78b08bd4fb6..dc22ab9573f 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IngestPosition.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IngestPosition.java
@@ -19,8 +19,6 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 
 /**
  * Ingest position.
- * 
- * @param <T> type of position
  */
-public interface IngestPosition<T> {
+public interface IngestPosition {
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 956887c95f7..6b60eef04e7 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -49,7 +49,7 @@ public final class DataRecord extends Record {
     
     private Long csn;
     
-    public DataRecord(final IngestPosition<?> position, final int columnCount) {
+    public DataRecord(final IngestPosition position, final int columnCount) {
         super(position);
         columns = new ArrayList<>(columnCount);
     }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/FinishedRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/FinishedRecord.java
index ad3add841d5..b69d32243a3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/FinishedRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/FinishedRecord.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 @ToString(callSuper = true)
 public final class FinishedRecord extends Record {
     
-    public FinishedRecord(final IngestPosition<?> position) {
+    public FinishedRecord(final IngestPosition position) {
         super(position);
     }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/PlaceholderRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/PlaceholderRecord.java
index 227c51b56dd..4157d1f8b12 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/PlaceholderRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/PlaceholderRecord.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 @ToString(callSuper = true)
 public final class PlaceholderRecord extends Record {
     
-    public PlaceholderRecord(final IngestPosition<?> position) {
+    public PlaceholderRecord(final IngestPosition position) {
         super(position);
     }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Record.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Record.java
index 403548a0515..18cafe5b540 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Record.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Record.java
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 @ToString
 public abstract class Record {
     
-    private final IngestPosition<?> position;
+    private final IngestPosition position;
     
     private long commitTime;
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
index 79bf3682195..343dace6f6c 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
@@ -27,11 +27,9 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 /**
  * Incremental dumper creator.
- * 
- * @param <T> type of ingest position
  */
 @SingletonSPI
-public interface IncrementalDumperCreator<T> extends TypedSPI {
+public interface IncrementalDumperCreator extends TypedSPI {
     
     /**
      * Create incremental dumper.
@@ -42,5 +40,5 @@ public interface IncrementalDumperCreator<T> extends TypedSPI {
      * @param metaDataLoader meta data loader
      * @return incremental dumper
      */
-    IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition<T> position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
+    IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
index 9ec5cf0d1c9..d038bbd4ac3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
@@ -38,7 +38,7 @@ public interface PositionInitializer extends TypedSPI {
      * @return position
      * @throws SQLException SQL exception
      */
-    IngestPosition<?> init(DataSource dataSource, String slotNameSuffix) throws SQLException;
+    IngestPosition init(DataSource dataSource, String slotNameSuffix) throws SQLException;
     
     /**
      * Init position by string data.
@@ -46,7 +46,7 @@ public interface PositionInitializer extends TypedSPI {
      * @param data string data
      * @return position
      */
-    IngestPosition<?> init(String data);
+    IngestPosition init(String data);
     
     /**
      * Clean up by data source if necessary.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/FinishedPosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/FinishedPosition.java
index ae01c7ee857..b449623426f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/FinishedPosition.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/FinishedPosition.java
@@ -20,5 +20,5 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 /**
  * Finished inventory position.
  */
-public final class FinishedPosition implements IngestPosition<FinishedPosition> {
+public final class FinishedPosition implements IngestPosition {
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IntegerPrimaryKeyPosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IntegerPrimaryKeyPosition.java
index 8fcb8d06719..6a345251061 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IntegerPrimaryKeyPosition.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/IntegerPrimaryKeyPosition.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 /**
  * Integer primary key position.
  */
-public final class IntegerPrimaryKeyPosition extends PrimaryKeyPosition<Long> implements IngestPosition<IntegerPrimaryKeyPosition> {
+public final class IntegerPrimaryKeyPosition extends PrimaryKeyPosition<Long> implements IngestPosition {
     
     private final long beginValue;
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
index cabfabac26a..c5e76650aee 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 /**
  * No unique key position.
  */
-public final class NoUniqueKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<NoUniqueKeyPosition> {
+public final class NoUniqueKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition {
     
     @Override
     public Void getBeginValue() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PlaceholderPosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PlaceholderPosition.java
index 3f999ac03ba..d6ee7df02e4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PlaceholderPosition.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PlaceholderPosition.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 /**
  * Placeholder position.
  */
-public final class PlaceholderPosition implements IngestPosition<PlaceholderPosition> {
+public final class PlaceholderPosition implements IngestPosition {
     
     @Override
     public String toString() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index d42a293eb72..76129b458f0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -37,7 +37,7 @@ public final class PrimaryKeyPositionFactory {
      * @return primary key position
      * @throws IllegalArgumentException illegal argument exception
      */
-    public static IngestPosition<?> newInstance(final String data) {
+    public static IngestPosition newInstance(final String data) {
         List<String> parts = Splitter.on(',').splitToList(data);
         Preconditions.checkArgument(3 == parts.size(), "Unknown primary key position: " + data);
         Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid primary key position type: " + parts.get(0));
@@ -65,7 +65,7 @@ public final class PrimaryKeyPositionFactory {
      * @param endValue end value
      * @return ingest position
      */
-    public static IngestPosition<?> newInstance(final Object beginValue, final Object endValue) {
+    public static IngestPosition newInstance(final Object beginValue, final Object endValue) {
         if (beginValue instanceof Number) {
             return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), null != endValue ? ((Number) endValue).longValue() : Long.MAX_VALUE);
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/StringPrimaryKeyPosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/StringPrimaryKeyPosition.java
index c3505b66902..758aa62abca 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/StringPrimaryKeyPosition.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/StringPrimaryKeyPosition.java
@@ -25,7 +25,7 @@ import lombok.RequiredArgsConstructor;
  */
 @RequiredArgsConstructor
 @Getter
-public final class StringPrimaryKeyPosition extends PrimaryKeyPosition<String> implements IngestPosition<StringPrimaryKeyPosition> {
+public final class StringPrimaryKeyPosition extends PrimaryKeyPosition<String> implements IngestPosition {
     
     private final String beginValue;
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
index 0798ba7c6c9..a5ff5a84d9c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 /**
  * Unsupported key position.
  */
-public final class UnsupportedKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<UnsupportedKeyPosition> {
+public final class UnsupportedKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition {
     
     @Override
     public Void getBeginValue() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
index ef63298a022..44091e1b96c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
@@ -38,7 +38,7 @@ public final class JobItemIncrementalTasksProgress {
      *
      * @return incremental position
      */
-    public Optional<IngestPosition<?>> getIncrementalPosition() {
+    public Optional<IngestPosition> getIncrementalPosition() {
         return null == incrementalTaskProgress ? Optional.empty() : Optional.of(incrementalTaskProgress.getPosition());
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
index 7265108bc9a..656f92c4a4a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
@@ -33,7 +33,7 @@ import java.util.stream.Collectors;
 @Getter
 public final class JobItemInventoryTasksProgress {
     
-    private final Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
+    private final Map<String, InventoryTaskProgress> progresses;
     
     /**
      * Get inventory position.
@@ -41,10 +41,8 @@ public final class JobItemInventoryTasksProgress {
      * @param tableName table name
      * @return inventory position
      */
-    public Map<String, IngestPosition<?>> getInventoryPosition(final String tableName) {
+    public Map<String, IngestPosition> getInventoryPosition(final String tableName) {
         Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", tableName));
-        return inventoryTaskProgressMap.entrySet().stream()
-                .filter(entry -> pattern.matcher(entry.getKey()).find())
-                .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition()));
+        return progresses.entrySet().stream().filter(entry -> pattern.matcher(entry.getKey()).find()).collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition()));
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
index 167c862835d..d50e5755150 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
@@ -26,11 +26,11 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public final class IncrementalTaskProgress implements TaskProgress {
     
-    private final AtomicReference<IngestPosition<?>> position = new AtomicReference<>();
+    private final AtomicReference<IngestPosition> position = new AtomicReference<>();
     
     private final AtomicReference<IncrementalTaskDelay> incrementalTaskDelay = new AtomicReference<>();
     
-    public IncrementalTaskProgress(final IngestPosition<?> position) {
+    public IncrementalTaskProgress(final IngestPosition position) {
         this.position.set(position);
         incrementalTaskDelay.set(new IncrementalTaskDelay());
     }
@@ -40,7 +40,7 @@ public final class IncrementalTaskProgress implements TaskProgress {
      * 
      * @return position
      */
-    public IngestPosition<?> getPosition() {
+    public IngestPosition getPosition() {
         return position.get();
     }
     
@@ -49,7 +49,7 @@ public final class IncrementalTaskProgress implements TaskProgress {
      * 
      * @param position position
      */
-    public void setPosition(final IngestPosition<?> position) {
+    public void setPosition(final IngestPosition position) {
         this.position.set(position);
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/InventoryTaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/InventoryTaskProgress.java
index e234f4dc9a3..83fb6708f6b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/InventoryTaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/InventoryTaskProgress.java
@@ -30,5 +30,5 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 @ToString
 public final class InventoryTaskProgress implements TaskProgress {
     
-    private final IngestPosition<?> position;
+    private final IngestPosition position;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 5f49977c691..8dfc0048cd4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -61,6 +61,7 @@ import java.sql.Statement;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Inventory dumper.
@@ -81,7 +82,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     
     private final PipelineTableMetaDataLoader metaDataLoader;
     
-    private volatile Statement dumpStatement;
+    private final AtomicReference<Statement> dumpStatement = new AtomicReference<>();
     
     public InventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
         this.dumperConfig = dumperConfig;
@@ -95,7 +96,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     
     @Override
     protected void runBlocking() {
-        IngestPosition<?> position = dumperConfig.getPosition();
+        IngestPosition position = dumperConfig.getPosition();
         if (position instanceof FinishedPosition) {
             log.info("Ignored because of already finished.");
             return;
@@ -118,7 +119,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
             connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
         }
         try (PreparedStatement preparedStatement = JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(databaseType, connection, buildInventoryDumpSQL())) {
-            dumpStatement = preparedStatement;
+            dumpStatement.set(preparedStatement);
             if (!(databaseType instanceof MySQLDatabaseType)) {
                 preparedStatement.setFetchSize(batchSize);
             }
@@ -138,7 +139,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
                         rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
                     }
                 }
-                dumpStatement = null;
+                dumpStatement.set(null);
                 log.info("Inventory dump done, rowCount={}", rowCount);
             }
         }
@@ -205,7 +206,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         return result;
     }
     
-    private IngestPosition<?> newPosition(final ResultSet resultSet) throws SQLException {
+    private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
         return dumperConfig.hasUniqueKey()
                 ? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperConfig.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue())
                 : new PlaceholderPosition();
@@ -213,6 +214,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     
     @Override
     protected void doStop() throws SQLException {
-        cancelStatement(dumpStatement);
+        cancelStatement(dumpStatement.get());
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
index 6b3af441973..e1e7026dc16 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
@@ -53,13 +53,13 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
     }
     
     private String[] getFinished(final JobItemInventoryTasksProgress progress) {
-        return progress.getInventoryTaskProgressMap().entrySet().stream()
+        return progress.getProgresses().entrySet().stream()
                 .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
                 .map(Entry::getKey).toArray(String[]::new);
     }
     
     private Map<String, String> getUnfinished(final JobItemInventoryTasksProgress progress) {
-        return progress.getInventoryTaskProgressMap().entrySet().stream()
+        return progress.getProgresses().entrySet().stream()
                 .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
                 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition().toString()));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 5c0e99da4b0..c6b67dd0720 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -124,9 +124,9 @@ public final class InventoryTaskSplitter {
         PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
         int batchSize = readConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
-        Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource);
+        Collection<IngestPosition> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource);
         int i = 0;
-        for (IngestPosition<?> each : inventoryPositions) {
+        for (IngestPosition each : inventoryPositions) {
             InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
             splitDumperConfig.setPosition(each);
             splitDumperConfig.setShardingItem(i++);
@@ -140,12 +140,12 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition<?>> getInventoryPositions(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig,
-                                                                final DataSource dataSource) {
+    private Collection<IngestPosition> getInventoryPositions(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig,
+                                                             final DataSource dataSource) {
         InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
         if (null != initProgress) {
             // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
-            Collection<IngestPosition<?>> result = initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
+            Collection<IngestPosition> result = initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
             if (!result.isEmpty()) {
                 return result;
             }
@@ -166,8 +166,8 @@ public final class InventoryTaskSplitter {
         return getUnsupportedPosition(jobItemContext, dataSource, dumperConfig);
     }
     
-    private Collection<IngestPosition<?>> getPositionWithoutUniqueKey(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                      final InventoryDumperConfiguration dumperConfig) {
+    private Collection<IngestPosition> getPositionWithoutUniqueKey(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+                                                                   final InventoryDumperConfiguration dumperConfig) {
         long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         return Collections.singletonList(new NoUniqueKeyPosition());
@@ -221,9 +221,9 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition<?>> getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                             final InventoryDumperConfiguration dumperConfig) {
-        Collection<IngestPosition<?>> result = new LinkedList<>();
+    private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+                                                                          final InventoryDumperConfiguration dumperConfig) {
+        Collection<IngestPosition> result = new LinkedList<>();
         PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
         String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
@@ -262,17 +262,16 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition<?>> getPositionByStringUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                            final InventoryDumperConfiguration dumperConfig) {
+    private Collection<IngestPosition> getPositionByStringUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+                                                                         final InventoryDumperConfiguration dumperConfig) {
         long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
-        Collection<IngestPosition<?>> result = new LinkedList<>();
+        Collection<IngestPosition> result = new LinkedList<>();
         result.add(new StringPrimaryKeyPosition(null, null));
         return result;
     }
     
-    private Collection<IngestPosition<?>> getUnsupportedPosition(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                 final InventoryDumperConfiguration dumperConfig) {
+    private Collection<IngestPosition> getUnsupportedPosition(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
         long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         return Collections.singletonList(new UnsupportedKeyPosition());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 11e11634862..e1f096c6056 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -133,10 +133,10 @@ public final class PipelineJobPreparerUtils {
      * @return ingest position
      * @throws SQLException sql exception
      */
-    public static IngestPosition<?> getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration dumperConfig,
-                                                           final PipelineDataSourceManager dataSourceManager) throws SQLException {
+    public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration dumperConfig,
+                                                        final PipelineDataSourceManager dataSourceManager) throws SQLException {
         if (null != initIncremental) {
-            Optional<IngestPosition<?>> position = initIncremental.getIncrementalPosition();
+            Optional<IngestPosition> position = initIncremental.getIncrementalPosition();
             if (position.isPresent()) {
                 return position.get();
             }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 530c401da71..b081785bd40 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -77,7 +77,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
                            final InventoryIncrementalJobItemContext jobItemContext) {
         taskId = dumperConfig.getDataSourceName();
         this.incrementalExecuteEngine = incrementalExecuteEngine;
-        IngestPosition<?> position = dumperConfig.getPosition();
+        IngestPosition position = dumperConfig.getPosition();
         taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
         channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
         dumper = PipelineTypedSPILoader.getDatabaseTypedService(
@@ -85,7 +85,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
         importers = createImporters(concurrency, importerConfig, importerConnector, channel, jobItemContext);
     }
     
-    private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position, final InventoryIncrementalJobItemProgress jobItemProgress) {
+    private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final InventoryIncrementalJobItemProgress jobItemProgress) {
         IncrementalTaskProgress result = new IncrementalTaskProgress(position);
         if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
             Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 35007836e44..7aa477675a6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -66,7 +66,7 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
     
     private final Importer importer;
     
-    private final AtomicReference<IngestPosition<?>> position;
+    private final AtomicReference<IngestPosition> position;
     
     public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
                          final PipelineChannelCreator pipelineChannelCreator, final ImporterConnector importerConnector,
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgressTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgressTest.java
index 9ac0737d651..bdb27dcdb04 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgressTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -45,14 +45,14 @@ class InventoryIncrementalJobItemProgressTest {
         InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml"));
         assertThat(actual.getStatus(), is(JobStatus.RUNNING));
         assertThat(actual.getSourceDatabaseType(), is("H2"));
-        assertThat(actual.getInventory().getInventoryTaskProgressMap().size(), is(4));
+        assertThat(actual.getInventory().getProgresses().size(), is(4));
         assertNotNull(actual.getIncremental().getIncrementalTaskProgress());
     }
     
     @Test
     void assertGetIncrementalPosition() {
         InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml"));
-        Optional<IngestPosition<?>> position = actual.getIncremental().getIncrementalPosition();
+        Optional<IngestPosition> position = actual.getIncremental().getIncrementalPosition();
         assertTrue(position.isPresent());
         assertThat(position.get(), instanceOf(PlaceholderPosition.class));
     }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index ea055f92df7..75099e77fb7 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -110,7 +110,7 @@ class MultiplexMemoryPipelineChannelTest {
     
     @RequiredArgsConstructor
     @Getter
-    private static final class IntPosition implements IngestPosition<IntPosition> {
+    private static final class IntPosition implements IngestPosition {
         
         private final int id;
     }
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
index 076c700b23b..3026800d198 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
@@ -23,16 +23,15 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDump
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * MySql incremental dumper creator.
  */
-public final class MySQLIncrementalDumperCreator implements IncrementalDumperCreator<BinlogPosition> {
+public final class MySQLIncrementalDumperCreator implements IncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> position,
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         return new MySQLIncrementalDumper(dumperConfig, position, channel, metaDataLoader);
     }
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index e4b5886ea9a..0dcd5656302 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -78,7 +78,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
     
     private final String catalog;
     
-    public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
+    public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition binlogPosition,
                                   final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
         this.dumperConfig = dumperConfig;
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
index e9f414ca4dd..2709a02a64e 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogPosition.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
  */
 @RequiredArgsConstructor
 @Getter
-public final class BinlogPosition implements IngestPosition<BinlogPosition> {
+public final class BinlogPosition implements IngestPosition {
     
     private final String filename;
     
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index 0381f69b1a3..3f4b84805c7 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -23,16 +23,15 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDump
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * OpenGauss incremental dumper creator.
  */
-public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator<WALPosition> {
+public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
     }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index b3c5fc1157e..52f5f86b081 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -66,7 +66,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
     
     private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
     
-    public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
+    public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
                               final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
index 5b329a1a419..03f75393fb1 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -23,16 +23,15 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDump
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * PostgreSQL incremental dumper creator.
  */
-public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator<WALPosition> {
+public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         return new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
     }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 7313acac241..88d8b70caa5 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -58,7 +58,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
     
     private final PostgreSQLLogicalReplication logicalReplication;
     
-    public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
+    public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
                                final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
index 9138ee014ad..b1e004de9ed 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Base
  */
 @RequiredArgsConstructor
 @Getter
-public final class WALPosition implements IngestPosition<WALPosition> {
+public final class WALPosition implements IngestPosition {
     
     private final BaseLogSequenceNumber logSequenceNumber;
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index c2fc628fd54..330c60e7a05 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -149,7 +149,7 @@ public final class TestDecodingPlugin implements DecodingPlugin {
     
     private Object readColumnData(final ByteBuffer data, final String columnType) {
         data.mark();
-        if ('n' == data.get() && data.remaining() >= 3 && 'u' == data.get() && 'l' == data.get() && 'l' == data.get()) {
+        if ('n' == data.get() && data.remaining() >= 3 && 'u' == data.get() && 'l' == data.get()) {
             if (data.hasRemaining()) {
                 data.get();
             }
diff --git a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/converter/segment/expression/impl/TrimFunctionConverter.java b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/converter/segment/expression/impl/TrimFunctionConverter.java
index 3867d29db9a..5cb9d30e831 100644
--- a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/converter/segment/expression/impl/TrimFunctionConverter.java
+++ b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/converter/segment/expression/impl/TrimFunctionConverter.java
@@ -46,10 +46,10 @@ public final class TrimFunctionConverter extends FunctionConverter {
         SqlIdentifier functionName = new SqlIdentifier(segment.getFunctionName(), SqlParserPos.ZERO);
         List<SqlOperator> functions = new LinkedList<>();
         SqlStdOperatorTable.instance().lookupOperatorOverloads(functionName, null, SqlSyntax.FUNCTION, functions, SqlNameMatchers.withCaseSensitive(false));
-        return Optional.of(new SqlBasicCall(functions.iterator().next(), getFunctionParameters(segment.getParameters()), SqlParserPos.ZERO));
+        return Optional.of(new SqlBasicCall(functions.iterator().next(), getTrimFunctionParameters(segment.getParameters()), SqlParserPos.ZERO));
     }
     
-    private List<SqlNode> getFunctionParameters(final Collection<ExpressionSegment> sqlSegments) {
+    private List<SqlNode> getTrimFunctionParameters(final Collection<ExpressionSegment> sqlSegments) {
         List<SqlNode> result = new LinkedList<>();
         if (1 == sqlSegments.size()) {
             result.add(Flag.BOTH.symbol(SqlParserPos.ZERO));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index 350f6e2b4a1..fd9441b1b56 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
@@ -28,10 +27,10 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
 /**
  * Fixture incremental dumper creator.
  */
-public final class FixtureIncrementalDumperCreator implements IncrementalDumperCreator<FinishedPosition> {
+public final class FixtureIncrementalDumperCreator implements IncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         return new FixtureIncrementalDumper();
     }