You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/05/30 11:52:54 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #25950: Improve CDC position ACK, make sure the FinishedPosition last to be ack

azexcy opened a new pull request, #25950:
URL: https://github.com/apache/shardingsphere/pull/25950

   Related: #22500
   
   Changes proposed in this pull request:
     - Improve PostgreSQL/openGauss WAL dumper reconnect
     - Wait position acked before FinishedRecord
     - Improve DataSourceRecordConsumer
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #25950: Improve CDC position ACK, make sure the FinishedPosition last to be ack

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz merged PR #25950:
URL: https://github.com/apache/shardingsphere/pull/25950


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25950: Improve CDC position ACK, make sure the FinishedPosition last to be ack

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25950:
URL: https://github.com/apache/shardingsphere/pull/25950#discussion_r1211009995


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java:
##########
@@ -116,6 +117,9 @@ public void write(final List<Record> recordList, final SocketSinkImporter socket
             int dataRecordCount = (int) recordList.stream().filter(DataRecord.class::isInstance).count();
             Record lastRecord = recordList.get(recordList.size() - 1);
             if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
+                while (CDCAckHolder.getInstance().hasPositionNeedToBeAckBeforeFinished(socketSinkImporter)) {
+                    Thread.sleep(500);
+                }

Review Comment:
   Is there better way?



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -92,6 +93,7 @@ protected void runBlocking() {
                 break;
             } catch (final SQLException ex) {
                 int times = reconnectTimes.incrementAndGet();
+                Thread.sleep(Math.min(10 * 1000L, 1000L << times));

Review Comment:
   The reconnect might not continue one by one, do we need to sleep more and more time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #25950: Improve CDC position ACK, make sure the FinishedPosition last to be ack

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #25950:
URL: https://github.com/apache/shardingsphere/pull/25950#discussion_r1211600591


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java:
##########
@@ -146,9 +144,8 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co
                         rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
                     }
                 }
-                if (!dataRecords.isEmpty()) {
-                    channel.pushRecords(dataRecords);
-                }
+                dataRecords.add(new FinishedRecord(new FinishedPosition()));
+                channel.pushRecords(dataRecords);

Review Comment:
   `dataRecords` might be empty, it's the same as before in this case



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java:
##########
@@ -63,7 +63,7 @@ public String bindAckIdWithPosition(final Map<SocketSinkImporter, CDCAckPosition
     private String generateAckId() {
         return "ACK-" + UUID.randomUUID();
     }
-    
+

Review Comment:
   The indent should be kept



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org