You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/06 08:47:18 UTC
[shardingsphere] branch master updated: Adding xid and csn parse for openGauss's logical replication (#22685)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a75e635bf0b Adding xid and csn parse for openGauss's logical replication (#22685)
a75e635bf0b is described below
commit a75e635bf0b723db42494814103833b31a3e4454
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Dec 6 16:47:09 2022 +0800
Adding xid and csn parse for openGauss's logical replication (#22685)
* Adding xid and csn parse for openGauss's logical replication
* Improve performance
* Add chinese characters at pipeline e2e test
---
.../pipeline/api/ingest/record/DataRecord.java | 2 +
.../OpenGaussIncrementalDumperCreator.java | 2 +-
.../opengauss/ingest/OpenGaussWALDumper.java | 46 +++++++++++++++++++--
.../ingest/wal/decode/MppdbDecodingPlugin.java | 48 +++++++++++++++-------
.../ingest/wal/decode/MppdbDecodingPluginTest.java | 35 +++++++++++++++-
.../postgresql/ingest/wal/WALEventConverter.java | 5 ++-
.../ingest/wal/event/AbstractRowEvent.java | 2 +
.../{AbstractRowEvent.java => BeginTXEvent.java} | 14 +++----
.../{AbstractRowEvent.java => CommitTXEvent.java} | 14 +++----
.../ingest/wal/WALEventConverterTest.java | 25 +++++++++++
.../pipeline/cases/task/MySQLIncrementTask.java | 5 +--
.../cases/task/PostgreSQLIncrementTask.java | 2 +-
12 files changed, 158 insertions(+), 42 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 126b8ee17e9..c9cb71f7f6a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -47,6 +47,8 @@ public final class DataRecord extends Record {
private String tableName;
+ private Long csn;
+
public DataRecord(final IngestPosition<?> position, final int columnCount) {
super(position);
columns = new ArrayList<>(columnCount);
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index 0381f69b1a3..dec16de00cc 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -34,7 +34,7 @@ public final class OpenGaussIncrementalDumperCreator implements IncrementalDumpe
@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
+ return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader, false);
}
@Override
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index cd0c36f55af..4830429699d 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -33,7 +33,10 @@ import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenG
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.opengauss.jdbc.PgConnection;
@@ -41,6 +44,8 @@ import org.opengauss.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
/**
* WAL dumper of openGauss.
@@ -57,8 +62,12 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
private final OpenGaussLogicalReplication logicalReplication;
+ private final boolean decodeWithTX;
+
+ private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+
public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
- final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader, final boolean decodeWithTX) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
@@ -66,6 +75,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
this.channel = channel;
walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
logicalReplication = new OpenGaussLogicalReplication();
+ this.decodeWithTX = decodeWithTX;
}
@Override
@@ -73,7 +83,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
- DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()));
+ DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
@@ -81,7 +91,11 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
continue;
}
AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
- channel.pushRecord(walEventConverter.convert(event));
+ if (decodeWithTX) {
+ processEventWithTX(event);
+ } else {
+ processEventIgnoreTX(event);
+ }
}
} catch (final SQLException ex) {
throw new IngestException(ex);
@@ -99,6 +113,32 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
}
+ private void processEventWithTX(final AbstractWALEvent event) {
+ if (event instanceof AbstractRowEvent) {
+ rowEvents.add((AbstractRowEvent) event);
+ return;
+ }
+ if (event instanceof BeginTXEvent) {
+ rowEvents.clear();
+ return;
+ }
+ if (event instanceof CommitTXEvent) {
+ Long csn = ((CommitTXEvent) event).getCsn();
+ for (AbstractRowEvent each : rowEvents) {
+ each.setCsn(csn);
+ channel.pushRecord(walEventConverter.convert(each));
+ }
+ }
+ channel.pushRecord(walEventConverter.convert(event));
+ }
+
+ private void processEventIgnoreTX(final AbstractWALEvent event) {
+ if (event instanceof BeginTXEvent) {
+ return;
+ }
+ channel.pushRecord(walEventConverter.convert(event));
+ }
+
@Override
protected void doStop() {
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 68d81a2817f..05755bf3657 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -30,6 +30,8 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Deco
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -39,6 +41,7 @@ import org.opengauss.util.PGobject;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -59,30 +62,45 @@ public final class MppdbDecodingPlugin implements DecodingPlugin {
private final BaseTimestampUtils timestampUtils;
+ private final boolean decodeWithTX;
+
+ public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) {
+ this.timestampUtils = timestampUtils;
+ decodeWithTX = false;
+ }
+
@Override
public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
AbstractWALEvent result;
- char eventType = readOneChar(data);
- result = '{' == eventType ? readTableEvent(readMppData(data)) : new PlaceholderEvent();
+ byte[] bytes = new byte[data.remaining()];
+ data.get(bytes);
+ String dataText = new String(bytes, StandardCharsets.UTF_8);
+ if (decodeWithTX) {
+ result = decodeDataWithTX(dataText);
+ } else {
+ result = decodeDataIgnoreTX(dataText);
+ }
result.setLogSequenceNumber(logSequenceNumber);
return result;
}
- private char readOneChar(final ByteBuffer data) {
- return (char) data.get();
+ private AbstractWALEvent decodeDataWithTX(final String dataText) {
+ AbstractWALEvent result = new PlaceholderEvent();
+ if (dataText.startsWith("BEGIN")) {
+ int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1;
+ result = new BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)));
+ } else if (dataText.startsWith("COMMIT")) {
+ int commitBeginIndex = dataText.indexOf("COMMIT") + "COMMIT".length() + 1;
+ int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1;
+ result = new CommitTXEvent(Long.parseLong(dataText.substring(commitBeginIndex, dataText.indexOf(" ", commitBeginIndex))), Long.parseLong(dataText.substring(csnBeginIndex)));
+ } else if (dataText.startsWith("{")) {
+ result = readTableEvent(dataText);
+ }
+ return result;
}
- private String readMppData(final ByteBuffer data) {
- StringBuilder mppData = new StringBuilder();
- mppData.append('{');
- int depth = 1;
- while (0 != depth && data.hasRemaining()) {
- char next = (char) data.get();
- mppData.append(next);
- int optDepth = '{' == next ? 1 : ('}' == next ? -1 : 0);
- depth += optDepth;
- }
- return mppData.toString();
+ private AbstractWALEvent decodeDataIgnoreTX(final String dataText) {
+ return dataText.startsWith("{") ? readTableEvent(dataText) : new PlaceholderEvent();
}
private AbstractRowEvent readTableEvent(final String mppData) {
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index c84035fbd02..d26c789655f 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -17,10 +17,14 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingException;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -34,11 +38,15 @@ import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -224,6 +232,31 @@ public final class MppdbDecodingPluginTest {
TimestampUtils timestampUtils = mock(TimestampUtils.class);
when(timestampUtils.toTime(null, "1 2 3")).thenThrow(new SQLException(""));
ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
- new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
+ new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), true).decode(data, logSequenceNumber);
+ }
+
+ @Test
+ public void assertDecodeWithXid() throws JsonProcessingException {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"raw"});
+ tableData.setColumnsVal(new String[]{"'7D'"});
+ List<String> dataList = Arrays.asList("BEGIN 1", OBJECT_MAPPER.writeValueAsString(tableData), OBJECT_MAPPER.writeValueAsString(tableData),
+ "COMMIT 1 (at 2022-10-27 04:19:39.476261+00) CSN 3468");
+ MppdbDecodingPlugin mppdbDecodingPlugin = new MppdbDecodingPlugin(null, true);
+ List<AbstractWALEvent> expectedEvent = new LinkedList<>();
+ for (String each : dataList) {
+ expectedEvent.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()), logSequenceNumber));
+ }
+ assertThat(expectedEvent.size(), is(4));
+ AbstractWALEvent actualFirstEvent = expectedEvent.get(0);
+ assertTrue(actualFirstEvent instanceof BeginTXEvent);
+ assertThat(((BeginTXEvent) actualFirstEvent).getXid(), is(1L));
+ AbstractWALEvent actualLastEvent = expectedEvent.get(expectedEvent.size() - 1);
+ assertTrue(actualLastEvent instanceof CommitTXEvent);
+ assertThat(((CommitTXEvent) actualLastEvent).getCsn(), is(3468L));
+ assertThat(((CommitTXEvent) actualLastEvent).getXid(), is(1L));
}
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 694de8ecf75..bbd59daa59a 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -28,6 +28,8 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableM
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -69,7 +71,7 @@ public final class WALEventConverter {
if (event instanceof DeleteRowEvent) {
return handleDeleteRowsEvent((DeleteRowEvent) event);
}
- if (event instanceof PlaceholderEvent) {
+ if (event instanceof PlaceholderEvent || event instanceof BeginTXEvent || event instanceof CommitTXEvent) {
return createPlaceholderRecord(event);
}
throw new UnsupportedSQLOperationException("");
@@ -124,6 +126,7 @@ public final class WALEventConverter {
private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getLowercase());
+ result.setCsn(rowsEvent.getCsn());
return result;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
index a88bd48de11..6cba665d907 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
@@ -32,4 +32,6 @@ public abstract class AbstractRowEvent extends AbstractWALEvent {
private String databaseName;
private String tableName;
+
+ private Long csn;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
similarity index 79%
copy from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
copy to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
index a88bd48de11..b608e04e5c5 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
@@ -18,18 +18,14 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event;
import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.RequiredArgsConstructor;
/**
- * Abstract row event.
+ * Begin TX event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-@ToString(callSuper = true)
-public abstract class AbstractRowEvent extends AbstractWALEvent {
+public final class BeginTXEvent extends AbstractWALEvent {
- private String databaseName;
-
- private String tableName;
+ private final long xid;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/CommitTXEvent.java
similarity index 79%
copy from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
copy to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/CommitTXEvent.java
index a88bd48de11..db40ca05306 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/CommitTXEvent.java
@@ -18,18 +18,16 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event;
import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.RequiredArgsConstructor;
/**
- * Abstract row event.
+ * Commit TX event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-@ToString(callSuper = true)
-public abstract class AbstractRowEvent extends AbstractWALEvent {
+public final class CommitTXEvent extends AbstractWALEvent {
- private String databaseName;
+ private final long xid;
- private String tableName;
+ private final Long csn;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index f4bdcda550d..a516a76b312 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -30,7 +30,10 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -39,6 +42,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.postgresql.replication.LogSequenceNumber;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -50,6 +54,7 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class WALEventConverterTest {
@@ -57,6 +62,8 @@ public final class WALEventConverterTest {
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
+ private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
+
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
@@ -89,6 +96,24 @@ public final class WALEventConverterTest {
}
}
+ @Test
+ public void assertConvertBeginTXEvent() {
+ BeginTXEvent beginTXEvent = new BeginTXEvent(100);
+ beginTXEvent.setLogSequenceNumber(new PostgreSQLLogSequenceNumber(logSequenceNumber));
+ Record record = walEventConverter.convert(beginTXEvent);
+ assertTrue(record instanceof PlaceholderRecord);
+ assertThat(((WALPosition) record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+ }
+
+ @Test
+ public void assertConvertCommitTXEvent() {
+ CommitTXEvent commitTXEvent = new CommitTXEvent(1, 3468L);
+ commitTXEvent.setLogSequenceNumber(new PostgreSQLLogSequenceNumber(logSequenceNumber));
+ Record record = walEventConverter.convert(commitTXEvent);
+ assertTrue(record instanceof PlaceholderRecord);
+ assertThat(((WALPosition) record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+ }
+
@Test
public void assertConvertWriteRowEvent() {
Record record = walEventConverter.convert(mockWriteRowEvent());
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
index 8c8ae23290c..28cf8be0d21 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -19,10 +19,9 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
@@ -61,7 +60,7 @@ public final class MySQLIncrementTask extends BaseIncrementTask {
private Object insertOrder() {
ThreadLocalRandom random = ThreadLocalRandom.current();
Object[] orderInsertDate = new Object[]{primaryKeyGenerateAlgorithm.generateKey(), random.nextInt(0, 6),
- random.nextInt(1, 99), RandomStringUtils.randomAlphabetic(10)};
+ random.nextInt(1, 99), "中文测试"};
jdbcTemplate.update(String.format("INSERT INTO %s (order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?)", orderTableName), orderInsertDate);
return orderInsertDate[0];
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index 4b274e1d9a9..b18ba3a5583 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -70,7 +70,7 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private Object insertOrder() {
ThreadLocalRandom random = ThreadLocalRandom.current();
- String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
+ String status = 0 == random.nextInt() % 2 ? null : "中文测试";
Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status};
String insertSQL = String.format("INSERT INTO %s (order_id,user_id,status) VALUES (?, ?, ?)", getTableNameWithSchema(orderTableName));
log.info("insert order sql:{}", insertSQL);