You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/29 12:24:25 UTC
[shardingsphere] branch master updated: Refactor pipeline and improve code style part 1 (#21834)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a0d4a596731 Refactor pipeline and improve code style part 1 (#21834)
a0d4a596731 is described below
commit a0d4a59673146b8d904d4c6e3ffa1350d13484bb
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Oct 29 20:24:18 2022 +0800
Refactor pipeline and improve code style part 1 (#21834)
* Rename Wal to WAL in class name
* Add PostgreSQL prefix for LogicalReplication
* Improve code style; Add TODO
---
.../PipelineChangedJobConfigurationProcessor.java | 1 +
.../datasource/AbstractDataSourcePreparer.java | 3 ++-
.../migration/MigrationDataConsistencyChecker.java | 2 ++
.../mysql/ingest/GlobalTableMapEventMapping.java | 12 +++++-----
.../OpenGaussIncrementalDumperCreator.java | 10 ++++----
.../ingest/OpenGaussPositionInitializer.java | 14 +++++------
...GaussWalDumper.java => OpenGaussWALDumper.java} | 22 ++++++++---------
.../ingest/wal/decode/MppdbDecodingPlugin.java | 6 ++---
.../PostgreSQLIncrementalDumperCreator.java | 10 ++++----
.../ingest/PostgreSQLPositionInitializer.java | 14 +++++------
...eSQLWalDumper.java => PostgreSQLWALDumper.java} | 28 +++++++++++-----------
...tion.java => PostgreSQLLogicalReplication.java} | 3 +--
...lEventConverter.java => WALEventConverter.java} | 22 ++++++++---------
.../wal/{WalPosition.java => WALPosition.java} | 6 ++---
.../ingest/wal/decode/DecodingPlugin.java | 4 ++--
.../ingest/wal/decode/TestDecodingPlugin.java | 6 ++---
.../ingest/wal/event/AbstractRowEvent.java | 2 +-
...AbstractWalEvent.java => AbstractWALEvent.java} | 4 ++--
.../ingest/wal/event/PlaceholderEvent.java | 2 +-
.../ingest/PostgreSQLPositionInitializerTest.java | 6 ++---
...umperTest.java => PostgreSQLWALDumperTest.java} | 16 ++++++-------
....java => PostgreSQLLogicalReplicationTest.java} | 6 ++---
...nverterTest.java => WALEventConverterTest.java} | 6 ++---
.../{WalPositionTest.java => WALPositionTest.java} | 8 +++----
.../ingest/wal/decode/TestDecodingPluginTest.java | 4 ++--
.../PostgreSQLPipelineSQLBuilderTest.java | 4 ++--
.../IncrementalDumperCreatorFactoryTest.java | 8 +++----
27 files changed, 116 insertions(+), 113 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
index ea8edfdbb57..88913b25b89 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
@@ -32,5 +32,6 @@ public interface PipelineChangedJobConfigurationProcessor extends TypedSPI {
* @param eventType event type
* @param jobConfigPOJO job configuration pojo
*/
+ // TODO replace JobConfigurationPOJO to JobConfiguration
void process(DataChangedEvent.Type eventType, JobConfigurationPOJO jobConfigPOJO);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 3efa3e040ca..fc1b948aa6b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -49,6 +49,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+ // TODO it's just used for openGauss
private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
@Override
@@ -83,7 +84,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
statement.execute(sql);
}
} catch (final SQLException ex) {
- log.error("create schema failed, {}", ex.getMessage());
+ log.warn("create schema failed, error: {}", ex.getMessage());
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index fc4e47e466e..76378438632 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -84,6 +84,7 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
String jobId = jobConfig.getJobId();
+ // TODO simplify code
InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
@@ -94,6 +95,7 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobId, sourceDataSource, targetDataSource,
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
+ // TODO make sure checkEndTimeMillis will be set
checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
index 92dd053e5d2..f97208a422b 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
@@ -24,19 +24,19 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Global table map event mapping.
- * // TODO still save at memory, if restart the Proxy, the data will be lost.
*/
-public class GlobalTableMapEventMapping {
+public final class GlobalTableMapEventMapping {
+ // TODO Still save in memory, if proxy restart, the data will be lost.
private static final Map<String, Map<Long, MySQLBinlogTableMapEventPacket>> TABLE_MAP_EVENT_MAPPING = new ConcurrentHashMap<>();
/**
- * Get table map event map by database url.
+ * Get table map event map by database URL.
*
- * @param databaseUrl database url
+ * @param databaseURL database URL
* @return table map event map
*/
- public static Map<Long, MySQLBinlogTableMapEventPacket> getTableMapEventMap(final String databaseUrl) {
- return TABLE_MAP_EVENT_MAPPING.computeIfAbsent(databaseUrl, k -> new ConcurrentHashMap<>());
+ public static Map<Long, MySQLBinlogTableMapEventPacket> getTableMapEventMap(final String databaseURL) {
+ return TABLE_MAP_EVENT_MAPPING.computeIfAbsent(databaseURL, k -> new ConcurrentHashMap<>());
}
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index 4083a4cd690..0381f69b1a3 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -22,19 +22,19 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
/**
* OpenGauss incremental dumper creator.
*/
-public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator<WalPosition> {
+public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator<WALPosition> {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+ public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new OpenGaussWalDumper(dumperConfig, position, channel, metaDataLoader);
+ return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
}
@Override
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 1b2542df1d5..c2298259124 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.opengauss.replication.LogSequenceNumber;
@@ -32,7 +32,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
/**
- * OpenGauss wal position initializer.
+ * OpenGauss WAL position initializer.
*/
// TODO reuse PostgreSQLPositionInitializer
@Slf4j
@@ -45,7 +45,7 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
@Override
- public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
+ public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, slotNameSuffix);
return getWalPosition(connection);
@@ -53,8 +53,8 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
}
@Override
- public WalPosition init(final String data) {
- return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+ public WALPosition init(final String data) {
+ return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
}
/**
@@ -92,12 +92,12 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
}
}
- private WalPosition getWalPosition(final Connection connection) throws SQLException {
+ private WALPosition getWalPosition(final Connection connection) throws SQLException {
try (
PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
- return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
+ return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
similarity index 89%
rename from kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
rename to kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 2f95e72aa3d..cd0c36f55af 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -30,10 +30,10 @@ import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLog
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussTimestampUtils;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.opengauss.jdbc.PgConnection;
@@ -45,26 +45,26 @@ import java.sql.SQLException;
/**
* WAL dumper of openGauss.
*/
-public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
+public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
- private final WalPosition walPosition;
+ private final WALPosition walPosition;
private final PipelineChannel channel;
- private final WalEventConverter walEventConverter;
+ private final WALEventConverter walEventConverter;
private final OpenGaussLogicalReplication logicalReplication;
- public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+ public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
- () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
+ () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
- walPosition = (WalPosition) position;
+ walPosition = (WALPosition) position;
this.channel = channel;
- walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
+ walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
logicalReplication = new OpenGaussLogicalReplication();
}
@@ -80,7 +80,7 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
ThreadUtil.sleep(10L);
continue;
}
- AbstractWalEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
+ AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
channel.pushRecord(walEventConverter.convert(event));
}
} catch (final SQLException ex) {
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 4577c287bc9..68d81a2817f 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Base
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingException;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -60,8 +60,8 @@ public final class MppdbDecodingPlugin implements DecodingPlugin {
private final BaseTimestampUtils timestampUtils;
@Override
- public AbstractWalEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
- AbstractWalEvent result;
+ public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
+ AbstractWALEvent result;
char eventType = readOneChar(data);
result = '{' == eventType ? readTableEvent(readMppData(data)) : new PlaceholderEvent();
result.setLogSequenceNumber(logSequenceNumber);
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
index 4b067556abc..5b329a1a419 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -22,19 +22,19 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
/**
* PostgreSQL incremental dumper creator.
*/
-public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator<WalPosition> {
+public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator<WALPosition> {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+ public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
+ return new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
}
@Override
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 2eed41e36a5..27c774612e4 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.postgresql.replication.LogSequenceNumber;
@@ -31,7 +31,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
/**
- * PostgreSQL wal position initializer.
+ * PostgreSQL WAL position initializer.
*/
@Slf4j
public final class PostgreSQLPositionInitializer implements PositionInitializer {
@@ -43,7 +43,7 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
@Override
- public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
+ public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
return getWalPosition(connection);
@@ -51,8 +51,8 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
@Override
- public WalPosition init(final String data) {
- return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+ public WALPosition init(final String data) {
+ return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
}
private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
@@ -81,12 +81,12 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
}
- private WalPosition getWalPosition(final Connection connection) throws SQLException {
+ private WALPosition getWalPosition(final Connection connection) throws SQLException {
try (
PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
- return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
+ return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
similarity index 85%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 39751a70b96..d2ca2e23cc9 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -26,14 +26,14 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.postgresql.jdbc.PgConnection;
@@ -46,27 +46,27 @@ import java.sql.SQLException;
/**
* PostgreSQL WAL dumper.
*/
-public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
+public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
- private final WalPosition walPosition;
+ private final WALPosition walPosition;
private final PipelineChannel channel;
- private final WalEventConverter walEventConverter;
+ private final WALEventConverter walEventConverter;
- private final LogicalReplication logicalReplication;
+ private final PostgreSQLLogicalReplication logicalReplication;
- public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+ public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
- () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
+ () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
- walPosition = (WalPosition) position;
+ walPosition = (WALPosition) position;
this.channel = channel;
- walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
- logicalReplication = new LogicalReplication();
+ walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
+ logicalReplication = new PostgreSQLLogicalReplication();
}
@Override
@@ -84,7 +84,7 @@ public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor impleme
ThreadUtil.sleep(10L);
continue;
}
- AbstractWalEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
+ AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
channel.pushRecord(walEventConverter.convert(event));
}
} catch (final SQLException ex) {
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
similarity index 97%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
index ebfe4814d39..d3912732efa 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplication.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java
@@ -33,8 +33,7 @@ import java.util.Properties;
/**
* PostgreSQL logical replication.
*/
-// TODO add prefix PostgreSQL
-public final class LogicalReplication {
+public final class PostgreSQLLogicalReplication {
/**
* Create connection.
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
similarity index 91%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index baa73c8fa25..694de8ecf75 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -37,26 +37,26 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
import java.util.List;
/**
- * Convert wal event to {@code Record}.
+ * WAL event converter.
*/
-public final class WalEventConverter {
+public final class WALEventConverter {
private final DumperConfiguration dumperConfig;
private final PipelineTableMetaDataLoader metaDataLoader;
- public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
+ public WALEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperConfig = dumperConfig;
this.metaDataLoader = metaDataLoader;
}
/**
- * Convert wal event to {@code Record}.
+ * Convert WAL event to {@code Record}.
*
* @param event of wal
* @return record
*/
- public Record convert(final AbstractWalEvent event) {
+ public Record convert(final AbstractWALEvent event) {
if (filter(event)) {
return createPlaceholderRecord(event);
}
@@ -75,7 +75,7 @@ public final class WalEventConverter {
throw new UnsupportedSQLOperationException("");
}
- private boolean filter(final AbstractWalEvent event) {
+ private boolean filter(final AbstractWALEvent event) {
if (isRowEvent(event)) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event;
return !dumperConfig.containsTable(rowEvent.getTableName());
@@ -83,12 +83,12 @@ public final class WalEventConverter {
return false;
}
- private boolean isRowEvent(final AbstractWalEvent event) {
+ private boolean isRowEvent(final AbstractWALEvent event) {
return event instanceof WriteRowEvent || event instanceof UpdateRowEvent || event instanceof DeleteRowEvent;
}
- private PlaceholderRecord createPlaceholderRecord(final AbstractWalEvent event) {
- return new PlaceholderRecord(new WalPosition(event.getLogSequenceNumber()));
+ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) {
+ return new PlaceholderRecord(new WALPosition(event.getLogSequenceNumber()));
}
private DataRecord handleWriteRowsEvent(final WriteRowEvent writeRowEvent) {
@@ -122,7 +122,7 @@ public final class WalEventConverter {
}
private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
- DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount);
+ DataRecord result = new DataRecord(new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getLowercase());
return result;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPosition.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
similarity index 91%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPosition.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
index 15631f121ff..9c50e41ac8d 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPosition.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
@@ -23,16 +23,16 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
/**
- * PostgreSQL wal position.
+ * WAL position.
*/
@RequiredArgsConstructor
@Getter
-public final class WalPosition implements IngestPosition<WalPosition> {
+public final class WALPosition implements IngestPosition<WALPosition> {
private final BaseLogSequenceNumber logSequenceNumber;
@Override
- public int compareTo(final WalPosition position) {
+ public int compareTo(final WALPosition position) {
return null == position ? 1 : Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java
index 5e3823e2c7e..29833310e25 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/DecodingPlugin.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import java.nio.ByteBuffer;
@@ -33,5 +33,5 @@ public interface DecodingPlugin {
* @param logSequenceNumber log sequence number
* @return WAL event
*/
- AbstractWalEvent decode(ByteBuffer data, BaseLogSequenceNumber logSequenceNumber);
+ AbstractWALEvent decode(ByteBuffer data, BaseLogSequenceNumber logSequenceNumber);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 3d543016aa6..99e632831fe 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -22,7 +22,7 @@ import lombok.AllArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -44,8 +44,8 @@ public final class TestDecodingPlugin implements DecodingPlugin {
private final BaseTimestampUtils timestampUtils;
@Override
- public AbstractWalEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
- AbstractWalEvent result = "table".equals(readEventType(data)) ? readTableEvent(data) : new PlaceholderEvent();
+ public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
+ AbstractWALEvent result = "table".equals(readEventType(data)) ? readTableEvent(data) : new PlaceholderEvent();
result.setLogSequenceNumber(logSequenceNumber);
return result;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
index 6635ca9f79a..a88bd48de11 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
@@ -27,7 +27,7 @@ import lombok.ToString;
@Getter
@Setter
@ToString(callSuper = true)
-public abstract class AbstractRowEvent extends AbstractWalEvent {
+public abstract class AbstractRowEvent extends AbstractWALEvent {
private String databaseName;
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWalEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWALEvent.java
similarity index 94%
rename from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWalEvent.java
rename to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWALEvent.java
index aa4033ddee0..abe21d0705e 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWalEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractWALEvent.java
@@ -23,12 +23,12 @@ import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
/**
- * Abstract wal event.
+ * Abstract WAL event.
*/
@Getter
@Setter
@ToString
-public abstract class AbstractWalEvent {
+public abstract class AbstractWALEvent {
private BaseLogSequenceNumber logSequenceNumber;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java
index e996ba4a676..c7e3c04b7a5 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/PlaceholderEvent.java
@@ -23,5 +23,5 @@ import lombok.ToString;
* Placeholder event.
*/
@ToString(callSuper = true)
-public final class PlaceholderEvent extends AbstractWalEvent {
+public final class PlaceholderEvent extends AbstractWALEvent {
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
index 5f34293f175..d51b39f9310 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -73,7 +73,7 @@ public final class PostgreSQLPositionInitializerTest {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
- WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
+ WALPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
}
@@ -81,7 +81,7 @@ public final class PostgreSQLPositionInitializerTest {
public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
- WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
+ WALPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
similarity index 94%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index fe46eb21779..14e65b96d4d 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.Multip
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.junit.After;
import org.junit.Before;
@@ -59,10 +59,10 @@ import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLWalDumperTest {
+public final class PostgreSQLWALDumperTest {
@Mock
- private LogicalReplication logicalReplication;
+ private PostgreSQLLogicalReplication logicalReplication;
@Mock
private PgConnection pgConnection;
@@ -70,11 +70,11 @@ public final class PostgreSQLWalDumperTest {
@Mock
private PGReplicationStream pgReplicationStream;
- private WalPosition position;
+ private WALPosition position;
private DumperConfiguration dumperConfig;
- private PostgreSQLWalDumper walDumper;
+ private PostgreSQLWALDumper walDumper;
private MultiplexMemoryPipelineChannel channel;
@@ -82,11 +82,11 @@ public final class PostgreSQLWalDumperTest {
@Before
public void setUp() {
- position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
+ position = new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
channel = new MultiplexMemoryPipelineChannel();
dumperConfig = mockDumperConfiguration();
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
- walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
+ walDumper = new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplicationTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
similarity index 95%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplicationTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
index deee71f5ae2..ae2292a62cc 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/LogicalReplicationTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplicationTest.java
@@ -43,7 +43,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class LogicalReplicationTest {
+public final class PostgreSQLLogicalReplicationTest {
@Mock
private PgConnection connection;
@@ -57,11 +57,11 @@ public final class LogicalReplicationTest {
@Mock
private ChainedLogicalStreamBuilder chainedLogicalStreamBuilder;
- private LogicalReplication logicalReplication;
+ private PostgreSQLLogicalReplication logicalReplication;
@Before
public void setUp() {
- logicalReplication = new LogicalReplication();
+ logicalReplication = new PostgreSQLLogicalReplication();
}
@Test
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
similarity index 97%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 28774a1aae5..f4bdcda550d 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -51,16 +51,16 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-public final class WalEventConverterTest {
+public final class WALEventConverterTest {
- private WalEventConverter walEventConverter;
+ private WALEventConverter walEventConverter;
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
- walEventConverter = new WalEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
+ walEventConverter = new WALEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
initTableData(dumperConfig);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPositionTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
similarity index 86%
rename from kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPositionTest.java
rename to kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
index bebe87fba3d..52d1c9cb431 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalPositionTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
@@ -24,17 +24,17 @@ import org.postgresql.replication.LogSequenceNumber;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-public final class WalPositionTest {
+public final class WALPositionTest {
@Test
public void assertCompareTo() {
- WalPosition walPosition = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
+ WALPosition walPosition = new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
assertThat(walPosition.compareTo(null), is(1));
- assertThat(walPosition.compareTo(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));
+ assertThat(walPosition.compareTo(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));
}
@Test
public void assertToString() {
- assertThat(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), is("100"));
+ assertThat(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), is("100"));
}
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
index 26ecf903e82..5d07bdcd4c8 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -104,7 +104,7 @@ public final class TestDecodingPluginTest {
@Test
public void assertDecodeInsertWithNullValue() {
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 col0[integer]:null col1[character varying]:null col2[character varying]:'nonnull'".getBytes());
- AbstractWalEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual, instanceOf(WriteRowEvent.class));
WriteRowEvent actualWriteRowEvent = (WriteRowEvent) actual;
assertThat(actualWriteRowEvent.getAfterRow().get(0), is(123));
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index f9aa95661d4..bf02a8e84fe 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
@@ -49,7 +49,7 @@ public final class PostgreSQLPipelineSQLBuilderTest {
}
private DataRecord mockDataRecord() {
- DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
+ DataRecord result = new DataRecord(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
result.setTableName("t_order");
result.addColumn(new Column("order_id", 1, true, true));
result.addColumn(new Column("user_id", 2, true, false));
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
index aa426fb655f..368513c9c86 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
@@ -28,8 +28,8 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDu
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
+import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
import org.junit.Test;
import org.mockito.Mock;
@@ -53,13 +53,13 @@ public final class IncrementalDumperCreatorFactoryTest {
@Test
public void assertIncrementalDumperCreatorForPostgreSQL() {
IncrementalDumper actual = createIncrementalDumper("PostgreSQL");
- assertThat(actual, instanceOf(PostgreSQLWalDumper.class));
+ assertThat(actual, instanceOf(PostgreSQLWALDumper.class));
}
@Test
public void assertIncrementalDumperCreatorForOpenGauss() {
IncrementalDumper actual = createIncrementalDumper("openGauss");
- assertThat(actual, instanceOf(OpenGaussWalDumper.class));
+ assertThat(actual, instanceOf(OpenGaussWALDumper.class));
}
@Test