You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/29 12:24:25 UTC

[shardingsphere] branch master updated: Refactor pipeline and improve code style part 1 (#21834)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a0d4a596731 Refactor pipeline and improve code style part 1 (#21834)
a0d4a596731 is described below

commit a0d4a59673146b8d904d4c6e3ffa1350d13484bb
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Oct 29 20:24:18 2022 +0800

    Refactor pipeline and improve code style part 1 (#21834)
    
    * Rename Wal to WAL in class name
    
    * Add PostgreSQL prefix for LogicalReplication
    
    * Improve code style; Add TODO
---
 .../PipelineChangedJobConfigurationProcessor.java  |  1 +
 .../datasource/AbstractDataSourcePreparer.java     |  3 ++-
 .../migration/MigrationDataConsistencyChecker.java |  2 ++
 .../mysql/ingest/GlobalTableMapEventMapping.java   | 12 +++++-----
 .../OpenGaussIncrementalDumperCreator.java         | 10 ++++----
 .../ingest/OpenGaussPositionInitializer.java       | 14 +++++------
 ...GaussWalDumper.java => OpenGaussWALDumper.java} | 22 ++++++++---------
 .../ingest/wal/decode/MppdbDecodingPlugin.java     |  6 ++---
 .../PostgreSQLIncrementalDumperCreator.java        | 10 ++++----
 .../ingest/PostgreSQLPositionInitializer.java      | 14 +++++------
 ...eSQLWalDumper.java => PostgreSQLWALDumper.java} | 28 +++++++++++-----------
 ...tion.java => PostgreSQLLogicalReplication.java} |  3 +--
 ...lEventConverter.java => WALEventConverter.java} | 22 ++++++++---------
 .../wal/{WalPosition.java => WALPosition.java}     |  6 ++---
 .../ingest/wal/decode/DecodingPlugin.java          |  4 ++--
 .../ingest/wal/decode/TestDecodingPlugin.java      |  6 ++---
 .../ingest/wal/event/AbstractRowEvent.java         |  2 +-
 ...AbstractWalEvent.java => AbstractWALEvent.java} |  4 ++--
 .../ingest/wal/event/PlaceholderEvent.java         |  2 +-
 .../ingest/PostgreSQLPositionInitializerTest.java  |  6 ++---
 ...umperTest.java => PostgreSQLWALDumperTest.java} | 16 ++++++-------
 ....java => PostgreSQLLogicalReplicationTest.java} |  6 ++---
 ...nverterTest.java => WALEventConverterTest.java} |  6 ++---
 .../{WalPositionTest.java => WALPositionTest.java} |  8 +++----
 .../ingest/wal/decode/TestDecodingPluginTest.java  |  4 ++--
 .../PostgreSQLPipelineSQLBuilderTest.java          |  4 ++--
 .../IncrementalDumperCreatorFactoryTest.java       |  8 +++----
 27 files changed, 116 insertions(+), 113 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
index ea8edfdbb57..88913b25b89 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
@@ -32,5 +32,6 @@ public interface PipelineChangedJobConfigurationProcessor extends TypedSPI {
      * @param eventType event type
      * @param jobConfigPOJO job configuration pojo
      */
+    // TODO replace JobConfigurationPOJO to JobConfiguration
     void process(DataChangedEvent.Type eventType, JobConfigurationPOJO jobConfigPOJO);
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 3efa3e040ca..fc1b948aa6b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -49,6 +49,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     
     private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
     
+    // TODO it's just used for openGauss
     private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
     
     @Override
@@ -83,7 +84,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
                 statement.execute(sql);
             }
         } catch (final SQLException ex) {
-            log.error("create schema failed, {}", ex.getMessage());
+            log.warn("create schema failed, error: {}", ex.getMessage());
         }
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index fc4e47e466e..76378438632 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -84,6 +84,7 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
             String jobId = jobConfig.getJobId();
+            // TODO simplify code
             InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
             Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
             long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
@@ -94,6 +95,7 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
             SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobId, sourceDataSource, targetDataSource,
                     sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
             result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
+            // TODO make sure checkEndTimeMillis will be set
             checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
index 92dd053e5d2..f97208a422b 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
@@ -24,19 +24,19 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Global table map event mapping.
- * // TODO still save at memory, if restart the Proxy, the data will be lost.
  */
-public class GlobalTableMapEventMapping {
+public final class GlobalTableMapEventMapping {
     
+    // TODO Still save in memory, if proxy restart, the data will be lost.
     private static final Map<String, Map<Long, MySQLBinlogTableMapEventPacket>> TABLE_MAP_EVENT_MAPPING = new ConcurrentHashMap<>();
     
     /**
-     * Get table map event map by database url.
+     * Get table map event map by database URL.
      *
-     * @param databaseUrl database url
+     * @param databaseURL database URL
      * @return table map event map
      */
-    public static Map<Long, MySQLBinlogTableMapEventPacket> getTableMapEventMap(final String databaseUrl) {
-        return TABLE_MAP_EVENT_MAPPING.computeIfAbsent(databaseUrl, k -> new ConcurrentHashMap<>());
+    public static Map<Long, MySQLBinlogTableMapEventPacket> getTableMapEventMap(final String databaseURL) {
+        return TABLE_MAP_EVENT_MAPPING.computeIfAbsent(databaseURL, k -> new ConcurrentHashMap<>());
     }
 }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index 4083a4cd690..0381f69b1a3 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -22,19 +22,19 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * OpenGauss incremental dumper creator.
  */
-public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator<WalPosition> {
+public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator<WALPosition> {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        return new OpenGaussWalDumper(dumperConfig, position, channel, metaDataLoader);
+        return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
     }
     
     @Override
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 1b2542df1d5..c2298259124 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.opengauss.replication.LogSequenceNumber;
 
@@ -32,7 +32,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
- * OpenGauss wal position initializer.
+ * OpenGauss WAL position initializer.
  */
 // TODO reuse PostgreSQLPositionInitializer
 @Slf4j
@@ -45,7 +45,7 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
     @Override
-    public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
+    public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             createSlotIfNotExist(connection, slotNameSuffix);
             return getWalPosition(connection);
@@ -53,8 +53,8 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     }
     
     @Override
-    public WalPosition init(final String data) {
-        return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+    public WALPosition init(final String data) {
+        return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
     }
     
     /**
@@ -92,12 +92,12 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
         }
     }
     
-    private WalPosition getWalPosition(final Connection connection) throws SQLException {
+    private WALPosition getWalPosition(final Connection connection) throws SQLException {
         try (
                 PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
-            return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
+            return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
         }
     }
     
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
similarity index 89%
rename from kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
rename to kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 2f95e72aa3d..cd0c36f55af 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -30,10 +30,10 @@ import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLog
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussTimestampUtils;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.opengauss.jdbc.PgConnection;
@@ -45,26 +45,26 @@ import java.sql.SQLException;
 /**
  * WAL dumper of openGauss.
  */
-public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
+public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
     
-    private final WalPosition walPosition;
+    private final WALPosition walPosition;
     
     private final PipelineChannel channel;
     
-    private final WalEventConverter walEventConverter;
+    private final WALEventConverter walEventConverter;
     
     private final OpenGaussLogicalReplication logicalReplication;
     
-    public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+    public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
                               final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
-                () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
+                () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
-        walPosition = (WalPosition) position;
+        walPosition = (WALPosition) position;
         this.channel = channel;
-        walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
+        walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
         logicalReplication = new OpenGaussLogicalReplication();
     }
     
@@ -80,7 +80,7 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
                     ThreadUtil.sleep(10L);
                     continue;
                 }
-                AbstractWalEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
+                AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
                 channel.pushRecord(walEventConverter.convert(event));
             }
         } catch (final SQLException ex) {
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 4577c287bc9..68d81a2817f 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Base
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingException;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -60,8 +60,8 @@ public final class MppdbDecodingPlugin implements DecodingPlugin {
     private final BaseTimestampUtils timestampUtils;
     
     @Override
-    public AbstractWalEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
-        AbstractWalEvent result;
+    public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
+        AbstractWALEvent result;
         char eventType = readOneChar(data);
         result = '{' == eventType ? readTableEvent(readMppData(data)) : new PlaceholderEvent();
         result.setLogSequenceNumber(logSequenceNumber);
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
index 4b067556abc..5b329a1a419 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -22,19 +22,19 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * PostgreSQL incremental dumper creator.
  */
-public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator<WalPosition> {
+public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator<WALPosition> {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        return new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
+        return new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
     }
     
     @Override
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 2eed41e36a5..27c774612e4 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.postgresql.replication.LogSequenceNumber;
@@ -31,7 +31,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
- * PostgreSQL wal position initializer.
+ * PostgreSQL WAL position initializer.
  */
 @Slf4j
 public final class PostgreSQLPositionInitializer implements PositionInitializer {
@@ -43,7 +43,7 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
     @Override
-    public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
+    public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
             return getWalPosition(connection);
@@ -51,8 +51,8 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     }
     
     @Override
-    public WalPosition init(final String data) {
-        return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+    public WALPosition init(final String data) {
+        return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
     }
     
     private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
@@ -81,12 +81,12 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
         }
     }
     
-    private WalPosition getWalPosition(final Connection connection) throws SQLException {
+    private WALPosition getWalPosition(final Connection connection) throws SQLException {
         try (
                 PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
-            return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
+            return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
         }
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
similarity index 85%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 39751a70b96..d2ca2e23cc9 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -26,14 +26,14 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.postgresql.jdbc.PgConnection;
@@ -46,27 +46,27 @@ import java.sql.SQLException;
 /**
  * PostgreSQL WAL dumper.
  */
-public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
+public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
     
-    private final WalPosition walPosition;
+    private final WALPosition walPosition;
     
     private final PipelineChannel channel;
     
-    private final WalEventConverter walEventConverter;
+    private final WALEventConverter walEventConverter;
     
-    private final LogicalReplication logicalReplication;
+    private final PostgreSQLLogicalReplication logicalReplication;
     
-    public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+    public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
                                final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
-                () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
+                () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
-        walPosition = (WalPosition) position;
+        walPosition = (WALPosition) position;
         this.channel = channel;
-        walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
-        logicalReplication = new LogicalReplication();
+        walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
+        logicalReplication = new PostgreSQLLogicalReplication();
     }
     
     @Override
@@ -84,7 +84,7 @@ public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor impleme
                     ThreadUtil.sleep(10L);
                     continue;
                 }
-                AbstractWalEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
+                AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
                 channel.pushRecord(walEventConverter.convert(event));
             }
         } catch (final SQLException ex) {
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
similarity index 97%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
index ebfe4814d39..d3912732efa 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
@@ -33,8 +33,7 @@ import java.util.Properties;
 /**
  * PostgreSQL logical replication.
  */
-// TODO add prefix PostgreSQL
-public final class LogicalReplication {
+public final class PostgreSQLLogicalReplication {
     
     /**
      * Create connection.
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
similarity index 91%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index baa73c8fa25..694de8ecf75 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -37,26 +37,26 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
 import java.util.List;
 
 /**
- * Convert wal event to {@code Record}.
+ * WAL event converter.
  */
-public final class WalEventConverter {
+public final class WALEventConverter {
     
     private final DumperConfiguration dumperConfig;
     
     private final PipelineTableMetaDataLoader metaDataLoader;
     
-    public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
+    public WALEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
         this.dumperConfig = dumperConfig;
         this.metaDataLoader = metaDataLoader;
     }
     
     /**
-     * Convert wal event to {@code Record}.
+     * Convert WAL event to {@code Record}.
      *
      * @param event of wal
      * @return record
      */
-    public Record convert(final AbstractWalEvent event) {
+    public Record convert(final AbstractWALEvent event) {
         if (filter(event)) {
             return createPlaceholderRecord(event);
         }
@@ -75,7 +75,7 @@ public final class WalEventConverter {
         throw new UnsupportedSQLOperationException("");
     }
     
-    private boolean filter(final AbstractWalEvent event) {
+    private boolean filter(final AbstractWALEvent event) {
         if (isRowEvent(event)) {
             AbstractRowEvent rowEvent = (AbstractRowEvent) event;
             return !dumperConfig.containsTable(rowEvent.getTableName());
@@ -83,12 +83,12 @@ public final class WalEventConverter {
         return false;
     }
     
-    private boolean isRowEvent(final AbstractWalEvent event) {
+    private boolean isRowEvent(final AbstractWALEvent event) {
         return event instanceof WriteRowEvent || event instanceof UpdateRowEvent || event instanceof DeleteRowEvent;
     }
     
-    private PlaceholderRecord createPlaceholderRecord(final AbstractWalEvent event) {
-        return new PlaceholderRecord(new WalPosition(event.getLogSequenceNumber()));
+    private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) {
+        return new PlaceholderRecord(new WALPosition(event.getLogSequenceNumber()));
     }
     
     private DataRecord handleWriteRowsEvent(final WriteRowEvent writeRowEvent) {
@@ -122,7 +122,7 @@ public final class WalEventConverter {
     }
     
     private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
-        DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount);
+        DataRecord result = new DataRecord(new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
         result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getLowercase());
         return result;
     }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPosition.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
similarity index 91%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPosition.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
index 15631f121ff..9c50e41ac8d 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPosition.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
@@ -23,16 +23,16 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
 
 /**
- * PostgreSQL wal position.
+ * WAL position.
  */
 @RequiredArgsConstructor
 @Getter
-public final class WalPosition implements IngestPosition<WalPosition> {
+public final class WALPosition implements IngestPosition<WALPosition> {
     
     private final BaseLogSequenceNumber logSequenceNumber;
     
     @Override
-    public int compareTo(final WalPosition position) {
+    public int compareTo(final WALPosition position) {
         return null == position ? 1 : Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java
index 5e3823e2c7e..29833310e25 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
 
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 
 import java.nio.ByteBuffer;
 
@@ -33,5 +33,5 @@ public interface DecodingPlugin {
      * @param logSequenceNumber log sequence number
      * @return WAL event
      */
-    AbstractWalEvent decode(ByteBuffer data, BaseLogSequenceNumber logSequenceNumber);
+    AbstractWALEvent decode(ByteBuffer data, BaseLogSequenceNumber logSequenceNumber);
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 3d543016aa6..99e632831fe 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -22,7 +22,7 @@ import lombok.AllArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -44,8 +44,8 @@ public final class TestDecodingPlugin implements DecodingPlugin {
     private final BaseTimestampUtils timestampUtils;
     
     @Override
-    public AbstractWalEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
-        AbstractWalEvent result = "table".equals(readEventType(data)) ? readTableEvent(data) : new PlaceholderEvent();
+    public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
+        AbstractWALEvent result = "table".equals(readEventType(data)) ? readTableEvent(data) : new PlaceholderEvent();
         result.setLogSequenceNumber(logSequenceNumber);
         return result;
     }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
index 6635ca9f79a..a88bd48de11 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
@@ -27,7 +27,7 @@ import lombok.ToString;
 @Getter
 @Setter
 @ToString(callSuper = true)
-public abstract class AbstractRowEvent extends AbstractWalEvent {
+public abstract class AbstractRowEvent extends AbstractWALEvent {
     
     private String databaseName;
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWalEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWALEvent.java
similarity index 94%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWalEvent.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWALEvent.java
index aa4033ddee0..abe21d0705e 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWalEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWALEvent.java
@@ -23,12 +23,12 @@ import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
 
 /**
- * Abstract wal event.
+ * Abstract WAL event.
  */
 @Getter
 @Setter
 @ToString
-public abstract class AbstractWalEvent {
+public abstract class AbstractWALEvent {
     
     private BaseLogSequenceNumber logSequenceNumber;
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java
index e996ba4a676..c7e3c04b7a5 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java
@@ -23,5 +23,5 @@ import lombok.ToString;
  * Placeholder event.
  */
 @ToString(callSuper = true)
-public final class PlaceholderEvent extends AbstractWalEvent {
+public final class PlaceholderEvent extends AbstractWALEvent {
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
index 5f34293f175..d51b39f9310 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -73,7 +73,7 @@ public final class PostgreSQLPositionInitializerTest {
         mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
-        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
+        WALPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
         assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
     }
     
@@ -81,7 +81,7 @@ public final class PostgreSQLPositionInitializerTest {
     public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
         mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
-        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
+        WALPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
         assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
similarity index 94%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index fe46eb21779..14e65b96d4d 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.Multip
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.After;
 import org.junit.Before;
@@ -59,10 +59,10 @@ import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLWalDumperTest {
+public final class PostgreSQLWALDumperTest {
     
     @Mock
-    private LogicalReplication logicalReplication;
+    private PostgreSQLLogicalReplication logicalReplication;
     
     @Mock
     private PgConnection pgConnection;
@@ -70,11 +70,11 @@ public final class PostgreSQLWalDumperTest {
     @Mock
     private PGReplicationStream pgReplicationStream;
     
-    private WalPosition position;
+    private WALPosition position;
     
     private DumperConfiguration dumperConfig;
     
-    private PostgreSQLWalDumper walDumper;
+    private PostgreSQLWALDumper walDumper;
     
     private MultiplexMemoryPipelineChannel channel;
     
@@ -82,11 +82,11 @@ public final class PostgreSQLWalDumperTest {
     
     @Before
     public void setUp() {
-        position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
+        position = new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         channel = new MultiplexMemoryPipelineChannel();
         dumperConfig = mockDumperConfiguration();
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
-        walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
+        walDumper = new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
         
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplicationTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
similarity index 95%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplicationTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
index deee71f5ae2..ae2292a62cc 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplicationTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
@@ -43,7 +43,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class LogicalReplicationTest {
+public final class PostgreSQLLogicalReplicationTest {
     
     @Mock
     private PgConnection connection;
@@ -57,11 +57,11 @@ public final class LogicalReplicationTest {
     @Mock
     private ChainedLogicalStreamBuilder chainedLogicalStreamBuilder;
     
-    private LogicalReplication logicalReplication;
+    private PostgreSQLLogicalReplication logicalReplication;
     
     @Before
     public void setUp() {
-        logicalReplication = new LogicalReplication();
+        logicalReplication = new PostgreSQLLogicalReplication();
     }
     
     @Test
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
similarity index 97%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 28774a1aae5..f4bdcda550d 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -51,16 +51,16 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class WalEventConverterTest {
+public final class WALEventConverterTest {
     
-    private WalEventConverter walEventConverter;
+    private WALEventConverter walEventConverter;
     
     private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
     
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        walEventConverter = new WalEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
+        walEventConverter = new WALEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
         initTableData(dumperConfig);
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPositionTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
similarity index 86%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPositionTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
index bebe87fba3d..52d1c9cb431 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPositionTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
@@ -24,17 +24,17 @@ import org.postgresql.replication.LogSequenceNumber;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class WalPositionTest {
+public final class WALPositionTest {
     
     @Test
     public void assertCompareTo() {
-        WalPosition walPosition = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
+        WALPosition walPosition = new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         assertThat(walPosition.compareTo(null), is(1));
-        assertThat(walPosition.compareTo(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));
+        assertThat(walPosition.compareTo(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));
     }
     
     @Test
     public void assertToString() {
-        assertThat(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), is("100"));
+        assertThat(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), is("100"));
     }
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
index 26ecf903e82..5d07bdcd4c8 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -104,7 +104,7 @@ public final class TestDecodingPluginTest {
     @Test
     public void assertDecodeInsertWithNullValue() {
         ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 col0[integer]:null col1[character varying]:null col2[character varying]:'nonnull'".getBytes());
-        AbstractWalEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
         assertThat(actual, instanceOf(WriteRowEvent.class));
         WriteRowEvent actualWriteRowEvent = (WriteRowEvent) actual;
         assertThat(actualWriteRowEvent.getAfterRow().get(0), is(123));
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index f9aa95661d4..bf02a8e84fe 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
@@ -49,7 +49,7 @@ public final class PostgreSQLPipelineSQLBuilderTest {
     }
     
     private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
+        DataRecord result = new DataRecord(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
         result.setTableName("t_order");
         result.addColumn(new Column("order_id", 1, true, true));
         result.addColumn(new Column("user_id", 2, true, false));
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
index aa426fb655f..368513c9c86 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
@@ -28,8 +28,8 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDu
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
+import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -53,13 +53,13 @@ public final class IncrementalDumperCreatorFactoryTest {
     @Test
     public void assertIncrementalDumperCreatorForPostgreSQL() {
         IncrementalDumper actual = createIncrementalDumper("PostgreSQL");
-        assertThat(actual, instanceOf(PostgreSQLWalDumper.class));
+        assertThat(actual, instanceOf(PostgreSQLWALDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForOpenGauss() {
         IncrementalDumper actual = createIncrementalDumper("openGauss");
-        assertThat(actual, instanceOf(OpenGaussWalDumper.class));
+        assertThat(actual, instanceOf(OpenGaussWALDumper.class));
     }
     
     @Test