You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/01 02:15:16 UTC
[shardingsphere] branch master updated: Improve CDC position ACK, make sure the FinishedPosition last to be ack (#25950)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c071b0355e6 Improve CDC position ACK, make sure the FinishedPosition last to be ack (#25950)
c071b0355e6 is described below
commit c071b0355e6b0b0e71320c898cc696a57dba203e
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Jun 1 10:15:09 2023 +0800
Improve CDC position ACK, make sure the FinishedPosition last to be ack (#25950)
* Wait position acked before FinishedRecord
* Improve reconnect at WAL dumper
* improve DataSourceRecordConsumer
* Adjust reconnect interval time
* Adjust push FinishedRecord location
* Improve sleep
* Keep check FinishedRecord when inventory data records is empty
---
.../cdc/client/handler/CDCRequestHandler.java | 3 +-
.../core/ingest/dumper/InventoryDumper.java | 11 +++---
.../opengauss/ingest/OpenGaussWALDumper.java | 4 +++
.../postgresql/ingest/PostgreSQLWALDumper.java | 4 +++
.../postgresql/ingest/wal/WALEventConverter.java | 2 +-
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 3 +-
.../cases/cdc/DataSourceRecordConsumer.java | 40 ++++++++++++++--------
7 files changed, 41 insertions(+), 26 deletions(-)
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 7ae88320240..162984667ee 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -87,8 +87,7 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
}
private void processDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
- List<Record> recordsList = result.getRecordList();
- consumer.accept(recordsList);
+ consumer.accept(result.getRecordList());
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index d1562e68dc4..4c6f41b3ef7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -109,8 +109,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
} catch (final SQLException ex) {
log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException("Inventory dump failed on " + dumperConfig.getActualTableName(), ex);
- } finally {
- channel.pushRecords(Collections.singletonList(new FinishedRecord(new FinishedPosition())));
}
}
@@ -132,12 +130,12 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Record> dataRecords = new LinkedList<>();
while (resultSet.next()) {
- dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
- ++rowCount;
if (dataRecords.size() >= batchSize) {
channel.pushRecords(dataRecords);
dataRecords = new LinkedList<>();
}
+ dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
+ ++rowCount;
if (!isRunning()) {
log.info("Broke because of inventory dump is not running.");
break;
@@ -146,9 +144,8 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
}
- if (!dataRecords.isEmpty()) {
- channel.pushRecords(dataRecords);
- }
+ dataRecords.add(new FinishedRecord(new FinishedPosition()));
+ channel.pushRecords(dataRecords);
dumpStatement.set(null);
log.info("Inventory dump done, rowCount={}", rowCount);
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index bbc5040c61e..5fa19fe7c10 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -83,6 +83,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
this.decodeWithTX = dumperConfig.isDecodeWithTX();
}
+ @SneakyThrows(InterruptedException.class)
@Override
protected void runBlocking() {
AtomicInteger reconnectTimes = new AtomicInteger();
@@ -93,6 +94,9 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
} catch (final SQLException ex) {
int times = reconnectTimes.incrementAndGet();
log.error("Connect failed, reconnect times={}", times, ex);
+ if (isRunning()) {
+ Thread.sleep(5000);
+ }
if (times >= 5) {
throw new IngestException(ex);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 67ba0431e82..8f19265d9b0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -85,6 +85,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
this.decodeWithTX = dumperConfig.isDecodeWithTX();
}
+ @SneakyThrows(InterruptedException.class)
@Override
protected void runBlocking() {
AtomicInteger reconnectTimes = new AtomicInteger();
@@ -95,6 +96,9 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
} catch (final SQLException ex) {
int times = reconnectTimes.incrementAndGet();
log.error("Connect failed, reconnect times={}", times, ex);
+ if (isRunning()) {
+ Thread.sleep(5000);
+ }
if (times >= 5) {
throw new IngestException(ex);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 1efb6597cf9..00cb9e6da3c 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -137,7 +137,7 @@ public final class WALEventConverter {
continue;
}
boolean isUniqueKey = columnMetaData.isUniqueKey();
- Object uniqueKeyOldValue = isUniqueKey ? values.get(i) : null;
+ Object uniqueKeyOldValue = isUniqueKey && IngestDataChangeType.UPDATE.equals(dataRecord.getType()) ? values.get(i) : null;
Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
dataRecord.addColumn(column);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index c823fc41844..4c7d46693d0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -128,9 +128,8 @@ class PostgreSQLWALDumperTest {
when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection, ""), position.getLogSequenceNumber()))
.thenReturn(pgReplicationStream);
ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
- when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
+ when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new IngestException(""));
when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
- // TODO NPE occurred here
walDumper.start();
} catch (final IngestException ignored) {
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index 492ec6f2a46..f9d837f20e7 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -40,7 +40,9 @@ import java.sql.SQLException;
import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalTime;
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -64,26 +66,42 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
@Override
public void accept(final List<Record> records) {
+ log.debug("Records: {}", records);
+ try (Connection connection = dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+ processRecords(records, connection);
+ connection.commit();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void processRecords(final List<Record> records, final Connection connection) throws SQLException {
long insertCount = records.stream().filter(each -> DataChangeType.INSERT == each.getDataChangeType()).count();
if (insertCount == records.size()) {
- batchInsertRecords(records);
+ Map<String, List<Record>> recordsMap = new HashMap<>();
+ for (Record each : records) {
+ String key = buildTableNameWithSchema(each.getMetaData().getTable(), each.getMetaData().getSchema());
+ recordsMap.computeIfAbsent(key, ignored -> new LinkedList<>()).add(each);
+ }
+ for (List<Record> each : recordsMap.values()) {
+ batchInsertRecords(each, connection);
+ }
return;
}
for (Record record : records) {
- write(record);
+ write(record, connection);
}
}
- private void batchInsertRecords(final List<Record> records) {
+ private void batchInsertRecords(final List<Record> records, final Connection connection) throws SQLException {
Record firstRecord = records.get(0);
MetaData metaData = firstRecord.getMetaData();
PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
List<String> columnNames = firstRecord.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
String tableName = buildTableNameWithSchema(metaData.getSchema(), metaData.getTable());
String insertSQL = SQLBuilderUtils.buildInsertSQL(columnNames, tableName);
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
for (Record each : records) {
List<TableColumn> tableColumns = each.getAfterList();
for (int i = 0; i < tableColumns.size(); i++) {
@@ -92,18 +110,14 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
- } catch (final SQLException ex) {
- throw new RuntimeException(ex);
}
}
- private void write(final Record record) {
+ private void write(final Record record, final Connection connection) throws SQLException {
String sql = buildSQL(record);
MetaData metaData = record.getMetaData();
PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
Map<String, TableColumn> afterMap = new LinkedHashMap<>(record.getBeforeList().size(), 1F);
record.getAfterList().forEach(each -> afterMap.put(each.getName(), each));
switch (record.getDataChangeType()) {
@@ -133,8 +147,6 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
default:
}
preparedStatement.execute();
- } catch (final SQLException ex) {
- throw new RuntimeException(ex);
}
}