You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/19 10:11:14 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16679: [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads to an avalanche

codelipenghui commented on code in PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#discussion_r924315439


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -408,6 +412,110 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
         orderedExecutor.shutdown();
     }
 
+    /**
+     * The use of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for timed
+     * tasks in the original implementation caused this problem:
+     *   When the writer thread processes slowly, the scheduleAtFixedRate task will continue to append tasks to the
+     *   ledger thread, this burdens the ledger thread and leads to an avalanche.
+     * This method is used to verify the fix for the above problem. see: https://github.com/apache/pulsar/pull/16679.
+     */
+    @Test
+    public void testPendingScheduleTriggerTaskCount() throws Exception {
+        // Create components.
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        OrderedExecutor orderedExecutor =  Mockito.mock(OrderedExecutor.class);
+        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(65536 * 2);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, workQueue);
+        Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));
+        SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+        // Count the number of tasks that have been submitted to bookie for later validation.
+        AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                completeFlushTaskCounter.incrementAndGet();
+                ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+                byteBuf.skipBytes(4);
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), byteBuf,
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any());
+        // Start tests.
+        TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,

Review Comment:
   Close the write after test.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -137,14 +139,35 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor ordered
         this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
         this.flushContext = FlushContext.newInstance();
         this.dataArray = new ArrayList<>();
+        this.scheduledExecutorService = scheduledExecutorService;
         // scheduler task.
-        if (batchEnabled) {
-            this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
-                    batchedWriteMaxDelayInMillis, batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+        if (this.batchEnabled) {
+            nextTimingTrigger();
         }
         this.state = State.OPEN;
     }
 
+    /***
+     * Why not use {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+     * Because: when the {@link #singleThreadExecutorForWrite} thread processes slowly, the scheduleAtFixedRate task
+     * will continue to append tasks to the ledger thread, this burdens the ledger thread and leads to an avalanche.
+     * see: https://github.com/apache/pulsar/pull/16679.
+     */
+    private void nextTimingTrigger(){
+        try {
+            if (state == State.CLOSING || state == State.CLOSED){
+                return;
+            }
+            scheduledFuture = scheduledExecutorService.schedule(() -> trigFlush(false, true),
+                    batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);

Review Comment:
   ```suggestion
               scheduledFuture = scheduledExecutorService.schedule(() -> trigFlush(false, true),
                       batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org