You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/05/19 04:26:09 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #25787: Refactor pipeline channel, read records by transaction at increment task

azexcy opened a new pull request, #25787:
URL: https://github.com/apache/shardingsphere/pull/25787

   Fixes #ISSUSE_ID.
   
   Changes proposed in this pull request:
     -
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198748537


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Could we convert it to empty list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198676392


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }

Review Comment:
   It's same, the queue `poll` may return null
   ```
       /**
        * Retrieves and removes the head of this queue, waiting up to the
        * specified wait time if necessary for an element to become available.
        *
        * @param timeout how long to wait before giving up, in units of
        *        {@code unit}
        * @param unit a {@code TimeUnit} determining how to interpret the
        *        {@code timeout} parameter
        * @return the head of this queue, or {@code null} if the
        *         specified waiting time elapses before an element is available
        * @throws InterruptedException if interrupted while waiting
        */
       E poll(long timeout, TimeUnit unit)
           throws InterruptedException;
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1199879888


##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : rowEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            records.add(walEventConverter.convert(event));
+            channel.pushRecords(records);
+        }
+    }
+    
+    private void processEventIgnoreTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {

Review Comment:
   `BeginTxEvent` position is same as data record. For example
   ```
   postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
       lsn    |  xid  |                          data
   -----------+-------+---------------------------------------------------------
    0/BA5A8E0 | 10299 | BEGIN 10299
    0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
    0/BA5A990 | 10299 | COMMIT 10299
   (3 rows)
   ```
   So  i think should not ignored CommitTxEvent, the position should be updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1200175998


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -44,23 +44,29 @@ public SimpleMemoryPipelineChannel(final int blockQueueSize, final AckCallback a
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public void pushRecord(final Record dataRecord) {
-        queue.put(dataRecord);
+    public void pushRecords(final List<Record> records) {
+        queue.put(records);
     }
     
     @SneakyThrows(InterruptedException.class)
     // TODO thread-safe?
     @Override
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
-        List<Record> result = new ArrayList<>(batchSize);
+        List<Record> result = new LinkedList<>();
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Ok, I add unit test now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1200003281


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = (List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);

Review Comment:
   It's guaranteed by previous handler `MySQLBinlogEventPacketDecoder`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1199516978


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java:
##########
@@ -48,23 +49,25 @@ public MultiplexMemoryPipelineChannel(final int channelNumber, final int blockQu
     }
     
     @Override
-    public void pushRecord(final Record record) {
-        if (FinishedRecord.class.equals(record.getClass())) {
-            for (int i = 0; i < channelNumber; i++) {
-                pushRecord(record, i);
+    public void pushRecords(final List<Record> records) {
+        for (Record each : records) {
+            if (FinishedRecord.class.equals(each.getClass())) {
+                for (int i = 0; i < channelNumber; i++) {
+                    pushRecord(each, i);
+                }
+            } else if (DataRecord.class.equals(each.getClass())) {
+                pushRecord(each, Math.abs(each.hashCode() % channelNumber));

Review Comment:
   Could we merge data records into collections



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
     
     private final boolean decodeWithTX;
     
-    private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    private List<AbstractRowEvent> walEvents = new LinkedList<>();

Review Comment:
   `walEvents` could be `rowEvents`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -44,23 +44,29 @@ public SimpleMemoryPipelineChannel(final int blockQueueSize, final AckCallback a
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public void pushRecord(final Record dataRecord) {
-        queue.put(dataRecord);
+    public void pushRecords(final List<Record> records) {
+        queue.put(records);
     }
     
     @SneakyThrows(InterruptedException.class)
     // TODO thread-safe?
     @Override
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
-        List<Record> result = new ArrayList<>(batchSize);
+        List<Record> result = new LinkedList<>();
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Could we verify whether `timeout` is less than `100` milliseconds?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -184,6 +234,29 @@ private void initRowsEvent(final AbstractRowsEvent rowsEvent, final MySQLBinlogE
         rowsEvent.setServerId(binlogEventHeader.getServerId());
     }
     
+    private QueryEvent decodeQueryEvent(final int checksumLength, final MySQLPacketPayload payload) {
+        int threadId = payload.readInt4();
+        int executionTime = payload.readInt4();
+        // length of the name of the database
+        payload.skipReserved(1);
+        int errorCode = payload.readInt2();
+        // status variables block
+        payload.skipReserved(payload.readInt2());
+        String databaseName = payload.readStringNul();
+        String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
+        return new QueryEvent(threadId, executionTime, errorCode, databaseName, sql);
+    }
+    
+    private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
+        XidEvent result = new XidEvent(payload.readInt8());
+        result.setFileName(binlogContext.getFileName());
+        result.setPosition(binlogEventHeader.getLogPos());
+        result.setTimestamp(binlogEventHeader.getTimestamp());
+        result.setServerId(binlogEventHeader.getServerId());
+        return result;
+    }
+    
+    // TODO May be used again later, keep this method first.

Review Comment:
   TODO could be removed



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java:
##########
@@ -122,10 +121,8 @@ public void onFailure(final Throwable throwable) {
     
     private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator) {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
-            if (null != lastNormalRecord) {
-                position.set(lastNormalRecord.getPosition());
-            }
+            Record lastRecord = records.get(records.size() - 1);
+            position.set(lastRecord.getPosition());

Review Comment:
   If there's FinishedRecord, could we set it to position?
   
   Looks `RecordUtils.getLastNormalRecord` could be kept to handle PlaceholderRecord, and also be updated to handle FinishedRecord?
   



##########
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java:
##########
@@ -56,6 +56,7 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
     
     private List<String> jobShardingDataNodes;
     
+    // TODO remove later

Review Comment:
   TODO could be removed



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }

Review Comment:
   Could we set `rowEvents = new ArrayList<>();` on CommitTXEvent, but wait next BeginTXEvent



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -77,14 +79,16 @@ public final class MySQLClient {
     
     private Promise<Object> responseCallback;
     
-    private final ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue = new ArrayBlockingQueue<>(10000);
+    private final ArrayBlockingQueue<List<AbstractBinlogEvent>> blockingEventQueue = new ArrayBlockingQueue<>(200);

Review Comment:
   From `10000` to `200`, is it enough? Though it's list, what's the average list size



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : rowEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            records.add(walEventConverter.convert(event));
+            channel.pushRecords(records);
+        }
+    }
+    
+    private void processEventIgnoreTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {

Review Comment:
   Should CommitTXEvent be ignored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1199879888


##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : rowEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            records.add(walEventConverter.convert(event));
+            channel.pushRecords(records);
+        }
+    }
+    
+    private void processEventIgnoreTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {

Review Comment:
   `BeginTxEvent` position is same as data record. For example
   ```
   postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
       lsn    |  xid  |                          data
   -----------+-------+---------------------------------------------------------
    0/BA5A8E0 | 10299 | BEGIN 10299
    0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
    0/BA5A990 | 10299 | COMMIT 10299
   (3 rows)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198610893


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java:
##########
@@ -76,39 +79,37 @@ private static void saveAckPosition(final Map<SocketSinkImporter, CDCAckPosition
         }
     }
     
-    private static DataRecord findMinimumDataRecordWithComparator(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
-                                                                  final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final Comparator<DataRecord> dataRecordComparator) {
-        Map<SocketSinkImporter, DataRecord> waitSortedMap = new HashMap<>();
-        for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
-            Record peek = entry.getValue().peek();
+    private static List<DataRecord> findMinimumDataRecordWithComparator(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+                                                                        final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final Comparator<DataRecord> dataRecordComparator) {
+        Map<SocketSinkImporter, List<DataRecord>> waitSortedMap = new HashMap<>();
+        for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry : incrementalRecordMap.entrySet()) {
+            List<DataRecord> peek = entry.getValue().peek();
             if (null == peek) {
                 continue;
             }
-            if (peek instanceof DataRecord) {
-                waitSortedMap.put(entry.getKey(), (DataRecord) peek);
-            }
+            waitSortedMap.put(entry.getKey(), peek);
         }
         if (waitSortedMap.isEmpty()) {
-            return null;
+            return Collections.emptyList();
         }
-        DataRecord minRecord = null;
+        List<DataRecord> minRecords = null;
         SocketSinkImporter belongImporter = null;
-        for (Entry<SocketSinkImporter, DataRecord> entry : waitSortedMap.entrySet()) {
-            if (null == minRecord) {
-                minRecord = entry.getValue();
+        for (Entry<SocketSinkImporter, List<DataRecord>> entry : waitSortedMap.entrySet()) {
+            if (null == minRecords) {
+                minRecords = entry.getValue();
                 belongImporter = entry.getKey();
                 continue;
             }
-            if (dataRecordComparator.compare(minRecord, entry.getValue()) > 0) {
-                minRecord = entry.getValue();
+            if (dataRecordComparator.compare(minRecords.get(0), entry.getValue().get(0)) > 0) {
+                minRecords = entry.getValue();
                 belongImporter = entry.getKey();
             }
         }
-        if (null == minRecord) {
-            return null;
+        if (null == minRecords) {
+            return Collections.emptyList();
         }
         incrementalRecordMap.get(belongImporter).poll();
-        saveAckPosition(cdcAckPositionMap, belongImporter, minRecord);
-        return minRecord;
+        saveAckPosition(cdcAckPositionMap, belongImporter, minRecords.get(minRecords.size() - 1));
+        return minRecords;

Review Comment:
   `return minRecords` could be `return result`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Is `records` possible to null?



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java:
##########
@@ -94,24 +93,11 @@ private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPositi
         return result;
     }
     
-    private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
-                                                 final PipelineJobProgressListener jobProgressListener) {
-        Collection<Importer> result = new LinkedList<>();
-        for (int i = 0; i < concurrency; i++) {
-            result.add(TypedSPILoader.getService(ImporterCreator.class, importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener,
-                    ImporterType.INCREMENTAL));
-        }
-        return result;
-    }
-    
-    private PipelineChannel createChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
-        return pipelineChannelCreator.createPipelineChannel(concurrency, records -> {
+    private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
+        return pipelineChannelCreator.createPipelineChannel(1, records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
-            if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
-                progress.setPosition(lastHandledRecord.getPosition());
-                progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
-            }
-            progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+            progress.setPosition(lastHandledRecord.getPosition());
+            progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());

Review Comment:
   Is there `PlaceholderPosition` in `records`? Looks `setLastEventTimestamps` could not be invoked for `PlaceholderPosition`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java:
##########
@@ -122,10 +121,8 @@ public void onFailure(final Throwable throwable) {
     
     private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator) {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
-            if (null != lastNormalRecord) {
-                position.set(lastNormalRecord.getPosition());
-            }
+            Record lastRecord = records.get(records.size() - 1);
+            position.set(lastRecord.getPosition());

Review Comment:
   Could `getLastNormalRecord` method be removed in `RecordUtils`?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }

Review Comment:
   Will `events` be null? Could we make it un-nullable



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -104,13 +146,16 @@ private AbstractBinlogEvent decodeEvent(final MySQLPacketPayload payload, final
             case DELETE_ROWS_EVENT_V1:
             case DELETE_ROWS_EVENT_V2:
                 return decodeDeleteRowsEventV2(binlogEventHeader, payload);
+            case QUERY_EVENT:
+                return decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload);
+            case XID_EVENT:
+                return decodeXidEvent(binlogEventHeader, payload);
             default:
-                PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);

Review Comment:
   Why not keep `createPlaceholderEvent` to make save position growing, to reduce binlog replay after job restarting



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +98,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            walEvents.add(event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            walEvents.add(event);
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : walEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            channel.pushRecords(records);
+        }

Review Comment:
   Why `event` is put to `channel` in `OpenGaussWALDumper` but not here



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java:
##########
@@ -139,9 +146,13 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co
                         rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
                     }
                 }
+                if (!dataRecords.isEmpty()) {
+                    channel.pushRecords(dataRecords);
+                }
                 dumpStatement.set(null);
                 log.info("Inventory dump done, rowCount={}", rowCount);
             }
+            

Review Comment:
   Empty line could be removed



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);
+            } else {
+                recordsCount += records.size();
+                result.addAll(records);
+            }
+            if (recordsCount >= batchSize) {
+                return result;
+            }

Review Comment:
   Looks `if (recordsCount >= batchSize)` is not required



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java:
##########
@@ -65,24 +63,25 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     
     private final Dumper dumper;
     
-    private final Collection<Importer> importers;
+    private final Importer importer;
     
     @Getter
     private final IncrementalTaskProgress taskProgress;
     
     // TODO simplify parameters
-    public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
+    public IncrementalTask(final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,

Review Comment:
   Is `concurrency` not needed any more?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }
-            handleEvent(event);
+            handleEvent(events);
         }
-        channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
+        channel.pushRecords(Collections.singletonList(new FinishedRecord(new FinishedPosition())));
     }
     
-    private void handleEvent(final AbstractBinlogEvent event) {
-        if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) event).getDatabaseName().equals(catalog) || !dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
-            createPlaceholderRecord(event);
-            return;
-        }
-        PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowsEvent) event).getTableName());
-        if (event instanceof WriteRowsEvent) {
-            handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
-            return;
+    private void handleEvent(final List<AbstractBinlogEvent> events) {

Review Comment:
   Could we add new method `handleEvents` and keep `handleEvent`?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -69,11 +78,44 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
                 in.resetReaderIndex();
                 break;
             }
-            Optional.ofNullable(decodeEvent(payload, binlogEventHeader)).ifPresent(out::add);
+            AbstractBinlogEvent binlogEvent = decodeEvent(payload, binlogEventHeader);
+            if (null == binlogEvent) {
+                skipChecksum(binlogEventHeader.getEventType(), in);
+                return;
+            }
+            if (decodeWithTX) {
+                processEventWithTX(binlogEvent, out);
+            } else {
+                processEventIgnoreTX(binlogEvent, out);
+            }
             skipChecksum(binlogEventHeader.getEventType(), in);
         }
     }
     
+    private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final List<Object> out) {
+        if (binlogEvent instanceof QueryEvent) {
+            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+            if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+                records.clear();
+            }

Review Comment:
   When begin SQL reach, `records` might not be empty, could we just clear it?



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -115,29 +117,31 @@ private PgConnection getReplicationConnectionUnwrap() throws SQLException {
     }
     
     private void processEventWithTX(final AbstractWALEvent event) {
-        if (event instanceof AbstractRowEvent) {
-            rowEvents.add((AbstractRowEvent) event);
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();
             return;

Review Comment:
   Verify: `walEvents` might be not empty on clearing



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +98,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();

Review Comment:
   Verify: `walEvents` might not be empty on clearing



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = (List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);

Review Comment:
   Does `records` include whole transaction's records?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Query event.
+ */
+@Getter
+@Setter
+public final class QueryEvent extends AbstractBinlogEvent {
+    
+    private long threadId;
+    
+    private long executionTime;

Review Comment:
   It's better to use `final` for `QueryEvent` fields



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Xid event.

Review Comment:
   It's better to explain the meaning. Does it mean end of transaction?



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
     
     private final boolean decodeWithTX;
     
-    private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    private final List<AbstractRowEvent> walEvents = new LinkedList<>();

Review Comment:
   It's better to keep `rowEvents`, since the element is `AbstractRowEvent`



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Query event.

Review Comment:
   It's better to explain the meaning. Does it mean begin of transaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198751371


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }

Review Comment:
   Yes, but it depends on MySQLClient, not `queue.poll` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198725177


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -104,13 +146,16 @@ private AbstractBinlogEvent decodeEvent(final MySQLPacketPayload payload, final
             case DELETE_ROWS_EVENT_V1:
             case DELETE_ROWS_EVENT_V2:
                 return decodeDeleteRowsEventV2(binlogEventHeader, payload);
+            case QUERY_EVENT:
+                return decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload);
+            case XID_EVENT:
+                return decodeXidEvent(binlogEventHeader, payload);
             default:
-                PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);

Review Comment:
   I hadn't thought of that question, rollback there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198757318


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }

Review Comment:
   Ok, i will improve



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz merged PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198716040


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -69,11 +78,44 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
                 in.resetReaderIndex();
                 break;
             }
-            Optional.ofNullable(decodeEvent(payload, binlogEventHeader)).ifPresent(out::add);
+            AbstractBinlogEvent binlogEvent = decodeEvent(payload, binlogEventHeader);
+            if (null == binlogEvent) {
+                skipChecksum(binlogEventHeader.getEventType(), in);
+                return;
+            }
+            if (decodeWithTX) {
+                processEventWithTX(binlogEvent, out);
+            } else {
+                processEventIgnoreTX(binlogEvent, out);
+            }
             skipChecksum(binlogEventHeader.getEventType(), in);
         }
     }
     
+    private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final List<Object> out) {
+        if (binlogEvent instanceof QueryEvent) {
+            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+            if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+                records.clear();
+            }

Review Comment:
   yes, because begin SQL reach, means a new transaction's arrives, but i will improve it, use `records = new LinkedList()` instead `clear()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198759004


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Yes,  if `poll` return null, SimpleMemoryPipelineChannel will return empty list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198670941


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java:
##########
@@ -94,24 +93,11 @@ private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPositi
         return result;
     }
     
-    private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
-                                                 final PipelineJobProgressListener jobProgressListener) {
-        Collection<Importer> result = new LinkedList<>();
-        for (int i = 0; i < concurrency; i++) {
-            result.add(TypedSPILoader.getService(ImporterCreator.class, importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener,
-                    ImporterType.INCREMENTAL));
-        }
-        return result;
-    }
-    
-    private PipelineChannel createChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
-        return pipelineChannelCreator.createPipelineChannel(concurrency, records -> {
+    private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
+        return pipelineChannelCreator.createPipelineChannel(1, records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
-            if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
-                progress.setPosition(lastHandledRecord.getPosition());
-                progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
-            }
-            progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+            progress.setPosition(lastHandledRecord.getPosition());
+            progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());

Review Comment:
   My misstake, i will rollback it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198706607


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = (List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);

Review Comment:
   Yes, if the msg is List, it's include whole transaction's records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198726770


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Query event.

Review Comment:
   Yes ,add the doc now
   https://mariadb.com/kb/en/query_event/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198792884


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java:
##########
@@ -65,24 +63,25 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     
     private final Dumper dumper;
     
-    private final Collection<Importer> importers;
+    private final Importer importer;
     
     @Getter
     private final IncrementalTaskProgress taskProgress;
     
     // TODO simplify parameters
-    public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
+    public IncrementalTask(final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,

Review Comment:
   Rollback it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198666166


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Yes, queue `poll` may return null 
   ```
       /**
        * Retrieves and removes the head of this queue,
        * or returns {@code null} if this queue is empty.
        *
        * @return the head of this queue, or {@code null} if this queue is empty
        */
       E poll();
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198726386


##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +98,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            walEvents.add(event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            walEvents.add(event);
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : walEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            channel.pushRecords(records);
+        }

Review Comment:
   improved, keep the method same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198753186


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = (List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);

Review Comment:
   When is `MySQLClient.MySQLBinlogEventHandler.channelRead` invoked, looks it's not guaranteed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #25787: Refactor pipeline channel, read records by transaction at increment task

Posted by "azexcy (via GitHub)" <gi...@apache.org>.
azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1202168635


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java:
##########
@@ -122,10 +121,8 @@ public void onFailure(final Throwable throwable) {
     
     private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator) {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
-            if (null != lastNormalRecord) {
-                position.set(lastNormalRecord.getPosition());
-            }
+            Record lastRecord = records.get(records.size() - 1);
+            position.set(lastRecord.getPosition());

Review Comment:
   The FinishedRecord should set to position, maybe could be simplified, because commit tx position should be record now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org