You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 12:07:41 UTC
[pulsar] 01/02: [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8873a2b8e9d06de458dc6412f3b0a0483b0bf5f9
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Nov 5 22:12:47 2021 +0200
[ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)
* [ML] Avoid passing OpAddEntry across a thread boundary
* Retain buffer in current thread
(cherry picked from commit 6af747f515677796bba343997b2269ffd27cb601)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++++++++-------
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 18 +++++++++---------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
3 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b4387e6..4cbb75f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -699,10 +699,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}
- OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
+ // retain buffer in this thread
+ buffer.retain();
// Jump to specific thread to avoid contention from writers writing from different threads
- executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
+ executor.executeOrdered(name, safeRun(() -> {
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
+ internalAsyncAddEntry(addOperation);
+ }));
}
@Override
@@ -711,10 +715,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}
- OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
+ // retain buffer in this thread
+ buffer.retain();
// Jump to specific thread to avoid contention from writers writing from different threads
- executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
+ executor.executeOrdered(name, safeRun(() -> {
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
+ internalAsyncAddEntry(addOperation);
+ }));
}
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
@@ -1508,9 +1516,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
- existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
- // release the extra retain
- ReferenceCountUtil.release(existsOp.data);
+ existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 72dccca..0b96b59 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -75,16 +75,16 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
CLOSED
}
- public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
- OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}
- public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
- OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -92,11 +92,11 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
return op;
}
- private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
- op.data = data.retain();
+ op.data = data;
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
@@ -154,7 +154,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());
-
+
if (!checkAndCompleteOp(ctx)) {
// means callback might have been completed by different thread (timeout task thread).. so do nothing
return;
@@ -254,7 +254,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
/**
* Checks if add-operation is completed
- *
+ *
* @return true if task is not already completed else returns false.
*/
private boolean checkAndCompleteOp(Object ctx) {
@@ -275,7 +275,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
- *
+ *
* @param lh
*/
void handleAddFailure(final LedgerHandle lh) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index eda4848..46029eb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2774,7 +2774,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null);
+ OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}