You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/01 02:15:16 UTC

[shardingsphere] branch master updated: Improve CDC position ACK, make sure the FinishedPosition last to be ack (#25950)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c071b0355e6 Improve CDC position ACK, make sure the FinishedPosition last to be ack (#25950)
c071b0355e6 is described below

commit c071b0355e6b0b0e71320c898cc696a57dba203e
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Jun 1 10:15:09 2023 +0800

    Improve CDC position ACK, make sure the FinishedPosition last to be ack (#25950)
    
    * Wait position acked before FinishedRecord
    
    * Improve reconnect at WAL dumper
    
    * improve DataSourceRecordConsumer
    
    * Adjust reconnect interval time
    
    * Adjust push FinishedRecord location
    
    * Improve sleep
    
    * Keep check FinishedRecord when inventory data records is empty
---
 .../cdc/client/handler/CDCRequestHandler.java      |  3 +-
 .../core/ingest/dumper/InventoryDumper.java        | 11 +++---
 .../opengauss/ingest/OpenGaussWALDumper.java       |  4 +++
 .../postgresql/ingest/PostgreSQLWALDumper.java     |  4 +++
 .../postgresql/ingest/wal/WALEventConverter.java   |  2 +-
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  3 +-
 .../cases/cdc/DataSourceRecordConsumer.java        | 40 ++++++++++++++--------
 7 files changed, 41 insertions(+), 26 deletions(-)

diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 7ae88320240..162984667ee 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -87,8 +87,7 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     }
     
     private void processDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
-        List<Record> recordsList = result.getRecordList();
-        consumer.accept(recordsList);
+        consumer.accept(result.getRecordList());
         ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index d1562e68dc4..4c6f41b3ef7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -109,8 +109,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         } catch (final SQLException ex) {
             log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
             throw new IngestException("Inventory dump failed on " + dumperConfig.getActualTableName(), ex);
-        } finally {
-            channel.pushRecords(Collections.singletonList(new FinishedRecord(new FinishedPosition())));
         }
     }
     
@@ -132,12 +130,12 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
                 List<Record> dataRecords = new LinkedList<>();
                 while (resultSet.next()) {
-                    dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
-                    ++rowCount;
                     if (dataRecords.size() >= batchSize) {
                         channel.pushRecords(dataRecords);
                         dataRecords = new LinkedList<>();
                     }
+                    dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
+                    ++rowCount;
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not running.");
                         break;
@@ -146,9 +144,8 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
                         rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
                     }
                 }
-                if (!dataRecords.isEmpty()) {
-                    channel.pushRecords(dataRecords);
-                }
+                dataRecords.add(new FinishedRecord(new FinishedPosition()));
+                channel.pushRecords(dataRecords);
                 dumpStatement.set(null);
                 log.info("Inventory dump done, rowCount={}", rowCount);
             }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index bbc5040c61e..5fa19fe7c10 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -83,6 +83,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
         this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
+    @SneakyThrows(InterruptedException.class)
     @Override
     protected void runBlocking() {
         AtomicInteger reconnectTimes = new AtomicInteger();
@@ -93,6 +94,9 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
             } catch (final SQLException ex) {
                 int times = reconnectTimes.incrementAndGet();
                 log.error("Connect failed, reconnect times={}", times, ex);
+                if (isRunning()) {
+                    Thread.sleep(5000);
+                }
                 if (times >= 5) {
                     throw new IngestException(ex);
                 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 67ba0431e82..8f19265d9b0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -85,6 +85,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
         this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
+    @SneakyThrows(InterruptedException.class)
     @Override
     protected void runBlocking() {
         AtomicInteger reconnectTimes = new AtomicInteger();
@@ -95,6 +96,9 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme
             } catch (final SQLException ex) {
                 int times = reconnectTimes.incrementAndGet();
                 log.error("Connect failed, reconnect times={}", times, ex);
+                if (isRunning()) {
+                    Thread.sleep(5000);
+                }
                 if (times >= 5) {
                     throw new IngestException(ex);
                 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 1efb6597cf9..00cb9e6da3c 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -137,7 +137,7 @@ public final class WALEventConverter {
                 continue;
             }
             boolean isUniqueKey = columnMetaData.isUniqueKey();
-            Object uniqueKeyOldValue = isUniqueKey ? values.get(i) : null;
+            Object uniqueKeyOldValue = isUniqueKey && IngestDataChangeType.UPDATE.equals(dataRecord.getType()) ? values.get(i) : null;
             Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
             dataRecord.addColumn(column);
         }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index c823fc41844..4c7d46693d0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -128,9 +128,8 @@ class PostgreSQLWALDumperTest {
             when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection, ""), position.getLogSequenceNumber()))
                     .thenReturn(pgReplicationStream);
             ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
-            when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
+            when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new IngestException(""));
             when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
-            // TODO NPE occurred here
             walDumper.start();
         } catch (final IngestException ignored) {
         }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index 492ec6f2a46..f9d837f20e7 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -40,7 +40,9 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -64,26 +66,42 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
     
     @Override
     public void accept(final List<Record> records) {
+        log.debug("Records: {}", records);
+        try (Connection connection = dataSource.getConnection()) {
+            connection.setAutoCommit(false);
+            processRecords(records, connection);
+            connection.commit();
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    private void processRecords(final List<Record> records, final Connection connection) throws SQLException {
         long insertCount = records.stream().filter(each -> DataChangeType.INSERT == each.getDataChangeType()).count();
         if (insertCount == records.size()) {
-            batchInsertRecords(records);
+            Map<String, List<Record>> recordsMap = new HashMap<>();
+            for (Record each : records) {
+                String key = buildTableNameWithSchema(each.getMetaData().getTable(), each.getMetaData().getSchema());
+                recordsMap.computeIfAbsent(key, ignored -> new LinkedList<>()).add(each);
+            }
+            for (List<Record> each : recordsMap.values()) {
+                batchInsertRecords(each, connection);
+            }
             return;
         }
         for (Record record : records) {
-            write(record);
+            write(record, connection);
         }
     }
     
-    private void batchInsertRecords(final List<Record> records) {
+    private void batchInsertRecords(final List<Record> records, final Connection connection) throws SQLException {
         Record firstRecord = records.get(0);
         MetaData metaData = firstRecord.getMetaData();
         PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
         List<String> columnNames = firstRecord.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
         String tableName = buildTableNameWithSchema(metaData.getSchema(), metaData.getTable());
         String insertSQL = SQLBuilderUtils.buildInsertSQL(columnNames, tableName);
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
+        try (PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
             for (Record each : records) {
                 List<TableColumn> tableColumns = each.getAfterList();
                 for (int i = 0; i < tableColumns.size(); i++) {
@@ -92,18 +110,14 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
                 preparedStatement.addBatch();
             }
             preparedStatement.executeBatch();
-        } catch (final SQLException ex) {
-            throw new RuntimeException(ex);
         }
     }
     
-    private void write(final Record record) {
+    private void write(final Record record, final Connection connection) throws SQLException {
         String sql = buildSQL(record);
         MetaData metaData = record.getMetaData();
         PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
             Map<String, TableColumn> afterMap = new LinkedHashMap<>(record.getBeforeList().size(), 1F);
             record.getAfterList().forEach(each -> afterMap.put(each.getName(), each));
             switch (record.getDataChangeType()) {
@@ -133,8 +147,6 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
                 default:
             }
             preparedStatement.execute();
-        } catch (final SQLException ex) {
-            throw new RuntimeException(ex);
         }
     }