You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/19 09:00:58 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

poorbarcode opened a new pull request, #16679:
URL: https://github.com/apache/pulsar/pull/16679

   Master Issue: #15370
   
   ### Motivation
   
   see #15370
   
   ### Modifications
   
   When the writer thread processes slowly, the scheduleAtFixedRate task will continue to append tasks to the ledger thread, this burdens the ledger thread and leads to an avalanche.
   
   Changes:  After a scheduled task is executed, add the next one.
   
   ### Documentation
   
   - [x] `doc-required` 
     
   - [ ] `doc-not-needed` 
     
   - [ ] `doc` 
   
   - [ ] `doc-complete`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#issuecomment-1190326385

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#issuecomment-1189434804

   > You said if the tasks are executed longer that interval, the queue starts building up until it explodes?
   
   Yes, I added more descriptions in `Motivation`.
   
   > I'm not sure I understand the nature of the context in this PR - would love a good brief on that - but could it be that executeWithFixedDelay will solve it, since it never starts the next task until the previous one is finished?
   
   This change can only avoid the task accumulation in TxnLogBufferedWriter caused by too many scheduled tasks but cannot improve the write performance of Managed Ledger. 
   
   I added more descriptions in `Motivation.`
   
   Thanks.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#discussion_r924315439


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -408,6 +412,110 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
         orderedExecutor.shutdown();
     }
 
+    /**
+     * The use of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for timed
+     * tasks in the original implementation caused this problem:
+     *   When the writer thread processes slowly, the scheduleAtFixedRate task will continue to append tasks to the
+     *   ledger thread, this burdens the ledger thread and leads to an avalanche.
+     * This method is used to verify the fix for the above problem. see: https://github.com/apache/pulsar/pull/16679.
+     */
+    @Test
+    public void testPendingScheduleTriggerTaskCount() throws Exception {
+        // Create components.
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        OrderedExecutor orderedExecutor =  Mockito.mock(OrderedExecutor.class);
+        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(65536 * 2);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, workQueue);
+        Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));
+        SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+        // Count the number of tasks that have been submitted to bookie for later validation.
+        AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                completeFlushTaskCounter.incrementAndGet();
+                ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+                byteBuf.skipBytes(4);
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), byteBuf,
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any());
+        // Start tests.
+        TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,

Review Comment:
   Close the write after test.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -137,14 +139,35 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor ordered
         this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
         this.flushContext = FlushContext.newInstance();
         this.dataArray = new ArrayList<>();
+        this.scheduledExecutorService = scheduledExecutorService;
         // scheduler task.
-        if (batchEnabled) {
-            this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
-                    batchedWriteMaxDelayInMillis, batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+        if (this.batchEnabled) {
+            nextTimingTrigger();
         }
         this.state = State.OPEN;
     }
 
+    /***
+     * Why not use {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+     * Because: when the {@link #singleThreadExecutorForWrite} thread processes slowly, the scheduleAtFixedRate task
+     * will continue to append tasks to the ledger thread, this burdens the ledger thread and leads to an avalanche.
+     * see: https://github.com/apache/pulsar/pull/16679.
+     */
+    private void nextTimingTrigger(){
+        try {
+            if (state == State.CLOSING || state == State.CLOSED){
+                return;
+            }
+            scheduledFuture = scheduledExecutorService.schedule(() -> trigFlush(false, true),
+                    batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);

Review Comment:
   ```suggestion
               scheduledFuture = scheduledExecutorService.schedule(() -> trigFlush(false, true),
                       batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#issuecomment-1189940606

   > Please check the failed test
   > org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterTest ► testFlushThresholds
   
   
   The flaky Test has been fixed. but the scheduled task timing is not as precise as I remember, I will look at it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#issuecomment-1189902710

   > After checking more details, we missed a part. If we flushed a batch, the next schedule should start by the last flush time + batch delay.
   
   After flush a batch, the task container is empty, so there is no need to check whether the first task needs to be executed earlier than `batchedWriteMaxDelayInMillis` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #16679:
URL: https://github.com/apache/pulsar/pull/16679


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#discussion_r924332159


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -137,14 +139,35 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor ordered
         this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
         this.flushContext = FlushContext.newInstance();
         this.dataArray = new ArrayList<>();
+        this.scheduledExecutorService = scheduledExecutorService;
         // scheduler task.
-        if (batchEnabled) {
-            this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
-                    batchedWriteMaxDelayInMillis, batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+        if (this.batchEnabled) {
+            nextTimingTrigger();
         }
         this.state = State.OPEN;
     }
 
+    /***
+     * Why not use {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+     * Because: when the {@link #singleThreadExecutorForWrite} thread processes slowly, the scheduleAtFixedRate task
+     * will continue to append tasks to the ledger thread, this burdens the ledger thread and leads to an avalanche.
+     * see: https://github.com/apache/pulsar/pull/16679.
+     */
+    private void nextTimingTrigger(){
+        try {
+            if (state == State.CLOSING || state == State.CLOSED){
+                return;
+            }
+            scheduledFuture = scheduledExecutorService.schedule(() -> trigFlush(false, true),
+                    batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);

Review Comment:
   Already fixed.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -408,6 +412,110 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
         orderedExecutor.shutdown();
     }
 
+    /**
+     * The use of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for timed
+     * tasks in the original implementation caused this problem:
+     *   When the writer thread processes slowly, the scheduleAtFixedRate task will continue to append tasks to the
+     *   ledger thread, this burdens the ledger thread and leads to an avalanche.
+     * This method is used to verify the fix for the above problem. see: https://github.com/apache/pulsar/pull/16679.
+     */
+    @Test
+    public void testPendingScheduleTriggerTaskCount() throws Exception {
+        // Create components.
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        OrderedExecutor orderedExecutor =  Mockito.mock(OrderedExecutor.class);
+        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(65536 * 2);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, workQueue);
+        Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));
+        SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+        // Count the number of tasks that have been submitted to bookie for later validation.
+        AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                completeFlushTaskCounter.incrementAndGet();
+                ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+                byteBuf.skipBytes(4);
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), byteBuf,
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any());
+        // Start tests.
+        TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#issuecomment-1190152305

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

Posted by GitBox <gi...@apache.org>.
asafm commented on PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#issuecomment-1188987782

   I can't understand this PR.
   You have an executor service adding a task to the queue every interval.
   You said if the tasks are executed longer that interval, the queue starts building up until it explodes?
   
   I'm not sure I understand the nature of the context in this PR - would love a good brief on that - but could it be that `executeWithFixedDelay` will solve it, since it never starts the next task until the previous one is finished?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org