You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/15 10:20:50 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] Skip reading more entries for a pending read with no more entries (#16400)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new e0141bc19bd [fix][broker] Skip reading more entries for a pending read with no more entries (#16400)
e0141bc19bd is described below

commit e0141bc19bd62028238aecb09c642126682c15f7
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 13 13:13:34 2022 +0800

    [fix][broker] Skip reading more entries for a pending read with no more entries (#16400)
    
    ### Motivation
    
    Related issue: https://github.com/streamnative/kop/issues/1379
    
    KoP uses reader on a single partition of a compacted topic and we
    observed a lot of logs like:
    
    > Error reading entries at 928511:1 : We can only have a single waiting callback
    
    It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns
    false, `asyncReadEntriesOrWait` is called for multiple times before
    `cancelPendingReadRequest` or new messages arrived.
    
    ### Modifications
    
    Throw a `ConcurrentWaitCallbackException` instead of a raw
    `ManagedLedgerException` when there are more wait callbacks. Then check
    this exception type and skip the following steps in
    `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
    
    (cherry picked from commit 5ec4e3d0cf977f7473cb4be1272699b11ad9e4a6)
---
 .../java/org/apache/bookkeeper/mledger/ManagedLedgerException.java | 7 +++++++
 .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +--
 .../persistent/PersistentDispatcherSingleActiveConsumer.java       | 7 +++++++
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 26cc9e12659..7046ba48193 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -182,6 +182,13 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class ConcurrentWaitCallbackException extends ManagedLedgerException {
+
+        public ConcurrentWaitCallbackException() {
+            super("We can only have a single waiting callback");
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 90ab8218b30..ddd4408e29e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -797,8 +797,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                 op.recycle();
-                callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
-                        ctx);
+                callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
                 return;
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 8d0adcf0ddb..d59b84b570e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -452,6 +453,12 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         havePendingRead = false;
         Consumer c = (Consumer) ctx;
 
+        if (exception instanceof ConcurrentWaitCallbackException) {
+            // At most one pending read request is allowed when there are no more entries, we should not trigger more
+            // read operations in this case and just wait the existing read operation completes.
+            return;
+        }
+
         long waitTimeMillis = readFailureBackoff.next();
 
         if (exception instanceof NoMoreEntriesToReadException) {