You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/15 10:20:50 UTC
[pulsar] branch branch-2.9 updated: [fix][broker] Skip reading more entries for a pending read with no more entries (#16400)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new e0141bc19bd [fix][broker] Skip reading more entries for a pending read with no more entries (#16400)
e0141bc19bd is described below
commit e0141bc19bd62028238aecb09c642126682c15f7
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 13 13:13:34 2022 +0800
[fix][broker] Skip reading more entries for a pending read with no more entries (#16400)
### Motivation
Related issue: https://github.com/streamnative/kop/issues/1379
KoP uses reader on a single partition of a compacted topic and we
observed a lot of logs like:
> Error reading entries at 928511:1 : We can only have a single waiting callback
It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns
false, `asyncReadEntriesOrWait` is called for multiple times before
`cancelPendingReadRequest` or new messages arrived.
### Modifications
Throw a `ConcurrentWaitCallbackException` instead of a raw
`ManagedLedgerException` when there are more wait callbacks. Then check
this exception type and skip the following steps in
`PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
(cherry picked from commit 5ec4e3d0cf977f7473cb4be1272699b11ad9e4a6)
---
.../java/org/apache/bookkeeper/mledger/ManagedLedgerException.java | 7 +++++++
.../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +--
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 7 +++++++
3 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 26cc9e12659..7046ba48193 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -182,6 +182,13 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class ConcurrentWaitCallbackException extends ManagedLedgerException {
+
+ public ConcurrentWaitCallbackException() {
+ super("We can only have a single waiting callback");
+ }
+ }
+
@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 90ab8218b30..ddd4408e29e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -797,8 +797,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
- callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
- ctx);
+ callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 8d0adcf0ddb..d59b84b570e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -452,6 +453,12 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
havePendingRead = false;
Consumer c = (Consumer) ctx;
+ if (exception instanceof ConcurrentWaitCallbackException) {
+ // At most one pending read request is allowed when there are no more entries, we should not trigger more
+ // read operations in this case and just wait the existing read operation completes.
+ return;
+ }
+
long waitTimeMillis = readFailureBackoff.next();
if (exception instanceof NoMoreEntriesToReadException) {