You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/20 21:50:15 UTC
[pulsar] branch branch-2.7 updated: Ensure all the ReadHandle gets
properly closed on cache invalidation (#10659)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 431da6f Ensure all the ReadHandle gets properly closed on cache invalidation (#10659)
431da6f is described below
commit 431da6f1063da028c8fe4dc118d1893f524f271d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 20 14:44:54 2021 -0700
Ensure all the ReadHandle gets properly closed on cache invalidation (#10659)
---
.../bookkeeper/mledger/impl/EntryCacheImpl.java | 8 ++---
.../bookkeeper/mledger/impl/EntryCacheManager.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 39 ++++++++++++++++++----
.../mledger/impl/OffloadPrefixReadTest.java | 24 ++++++++++++-
4 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index 25585c2..1e15bce 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -211,7 +211,7 @@ public class EntryCacheImpl implements EntryCache {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
- ml.invalidateLedgerHandle(lh, exception);
+ ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return;
}
@@ -234,7 +234,7 @@ public class EntryCacheImpl implements EntryCache {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
- ml.invalidateLedgerHandle(lh, exception);
+ ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
}
@@ -303,7 +303,7 @@ public class EntryCacheImpl implements EntryCache {
&& ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
- ml.invalidateLedgerHandle(lh, exception);
+ ml.invalidateLedgerHandle(lh);
ManagedLedgerException mlException = createManagedLedgerException(exception);
callback.readEntriesFailed(mlException, ctx);
}
@@ -337,7 +337,7 @@ public class EntryCacheImpl implements EntryCache {
&& ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
- ml.invalidateLedgerHandle(lh, exception);
+ ml.invalidateLedgerHandle(lh);
ManagedLedgerException mlException = createManagedLedgerException(exception);
callback.readEntriesFailed(mlException, ctx);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 3bd3e4a..a692e46 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -229,7 +229,7 @@ public class EntryCacheManager {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
- ml.invalidateLedgerHandle(lh, exception);
+ ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c0a7dd4..d50ab92 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1235,6 +1235,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return;
}
+ ledgerCache.forEach((ledgerId, readHandle) -> {
+ invalidateReadHandle(ledgerId);
+ });
+
closeAllCursors(callback, ctx);
}, null);
@@ -1253,7 +1257,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
futures.add(closeFuture);
}
- Futures.waitForAll(futures).thenRun(() -> callback.closeComplete(ctx)).exceptionally(exception -> {
+ Futures.waitForAll(futures)
+ .thenRun(() -> callback.closeComplete(ctx))
+ .exceptionally(exception -> {
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx);
return null;
});
@@ -1570,7 +1576,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
LedgerInfo info = ledgers.get(ledgerId);
- CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
+ CompletableFuture<ReadHandle> openFuture;
if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader().getOffloadPolicies() != null
@@ -1609,17 +1615,35 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
});
}
- void invalidateLedgerHandle(ReadHandle ledgerHandle, Throwable t) {
+ void invalidateReadHandle(long ledgerId) {
+ CompletableFuture<ReadHandle> rhf = ledgerCache.remove(ledgerId);
+ if (rhf != null) {
+ rhf.thenAccept(ReadHandle::closeAsync)
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to close a Ledger ReadHandle:", name, ex);
+ return null;
+ });
+ }
+ }
+
+ void invalidateLedgerHandle(ReadHandle ledgerHandle) {
long ledgerId = ledgerHandle.getId();
+ LedgerHandle currentLedger = this.currentLedger;
+
if (currentLedger != null && ledgerId != currentLedger.getId()) {
// remove handle from ledger cache since we got a (read) error
ledgerCache.remove(ledgerId);
if (log.isDebugEnabled()) {
- log.debug("[{}] Removed ledger {} from cache (after read error)", name, ledgerId, t);
+ log.debug("[{}] Removed ledger read handle {} from cache", name, ledgerId);
}
+ ledgerHandle.closeAsync()
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to close a Ledger ReadHandle:", name, ex);
+ return null;
+ });
} else {
if (log.isDebugEnabled()) {
- log.debug("[{}] Ledger that encountered read error is current ledger", name, t);
+ log.debug("[{}] Ledger that encountered read error is current ledger", name);
}
}
}
@@ -2176,7 +2200,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
ls.getLedgerId(), currentLastConfirmedEntry);
}
- ledgerCache.remove(ls.getLedgerId());
+
+ invalidateReadHandle(ls.getLedgerId());
ledgers.remove(ls.getLedgerId());
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
@@ -2612,7 +2637,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
newFirstUnoffloaded,
errorToReport);
} else {
- ledgerCache.remove(ledgerId);
+ invalidateReadHandle(ledgerId);
offloadLoop(promise, ledgersToOffload, firstUnoffloaded, firstError);
}
});
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 69011cac..045ca5d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -40,6 +40,8 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.SneakyThrows;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -112,6 +114,10 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
}
verify(offloader, times(2))
.readOffloaded(anyLong(), any(), anyMap());
+
+ ledger.close();
+ // Ensure that all the read handles had been closed
+ assertEquals(offloader.openedReadHandles.get(), 0);
}
@Test
@@ -224,10 +230,11 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
return promise;
}
+ @SneakyThrows
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
Map<String, String> offloadDriverMetadata) {
- return CompletableFuture.completedFuture(offloads.get(uuid));
+ return CompletableFuture.completedFuture(new VerifyClosingReadHandle(offloads.get(uuid)));
}
@Override
@@ -246,6 +253,21 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
public void close() {
}
+
+ private final AtomicInteger openedReadHandles = new AtomicInteger(0);
+
+ class VerifyClosingReadHandle extends MockOffloadReadHandle {
+ VerifyClosingReadHandle(ReadHandle toCopy) throws Exception {
+ super(toCopy);
+ openedReadHandles.incrementAndGet();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ openedReadHandles.decrementAndGet();
+ return super.closeAsync();
+ }
+ }
}
static class MockOffloadReadHandle implements ReadHandle {