You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/20 21:46:58 UTC

[pulsar] branch master updated: Ensure all the ReadHandle gets properly closed on cache invalidation (#10659)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00677bf  Ensure all the ReadHandle gets properly closed on cache invalidation (#10659)
00677bf is described below

commit 00677bfcbf58f3825d1dbebb242ceba011862888
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 20 14:44:54 2021 -0700

    Ensure all the ReadHandle gets properly closed on cache invalidation (#10659)
---
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    |  8 ++---
 .../bookkeeper/mledger/impl/EntryCacheManager.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 39 ++++++++++++++++++----
 .../mledger/impl/OffloadPrefixReadTest.java        | 24 ++++++++++++-
 4 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index ee3660b..a0b0faa 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -213,7 +213,7 @@ public class EntryCacheImpl implements EntryCache {
             lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
                     (ledgerEntries, exception) -> {
                         if (exception != null) {
-                            ml.invalidateLedgerHandle(lh, exception);
+                            ml.invalidateLedgerHandle(lh);
                             callback.readEntryFailed(createManagedLedgerException(exception), ctx);
                             return;
                         }
@@ -236,7 +236,7 @@ public class EntryCacheImpl implements EntryCache {
                             ledgerEntries.close();
                         }
                     }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
-                    	  ml.invalidateLedgerHandle(lh, exception);
+                          ml.invalidateLedgerHandle(lh);
                           callback.readEntryFailed(createManagedLedgerException(exception), ctx);
                           return null;
                     }
@@ -305,7 +305,7 @@ public class EntryCacheImpl implements EntryCache {
                                 && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
                                 callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
                             } else {
-                                ml.invalidateLedgerHandle(lh, exception);
+                                ml.invalidateLedgerHandle(lh);
                                 ManagedLedgerException mlException = createManagedLedgerException(exception);
                                 callback.readEntriesFailed(mlException, ctx);
                             }
@@ -339,7 +339,7 @@ public class EntryCacheImpl implements EntryCache {
                                   && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
                                   callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
                               } else {
-                                  ml.invalidateLedgerHandle(lh, exception);
+                                  ml.invalidateLedgerHandle(lh);
                                   ManagedLedgerException mlException = createManagedLedgerException(exception);
                                   callback.readEntriesFailed(mlException, ctx);
                               }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 8401c35..81008cc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -227,7 +227,7 @@ public class EntryCacheManager {
             lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
                     (ledgerEntries, exception) -> {
                         if (exception != null) {
-                            ml.invalidateLedgerHandle(lh, exception);
+                            ml.invalidateLedgerHandle(lh);
                             callback.readEntryFailed(createManagedLedgerException(exception), ctx);
                             return;
                         }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8fa3436..106fcc9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1333,6 +1333,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
+            ledgerCache.forEach((ledgerId, readHandle) -> {
+                invalidateReadHandle(ledgerId);
+            });
+
             closeAllCursors(callback, ctx);
         }, null);
 
@@ -1355,7 +1359,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             futures.add(closeFuture);
         }
 
-        Futures.waitForAll(futures).thenRun(() -> callback.closeComplete(ctx)).exceptionally(exception -> {
+        Futures.waitForAll(futures)
+                .thenRun(() -> callback.closeComplete(ctx))
+                .exceptionally(exception -> {
             callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx);
             return null;
         });
@@ -1728,7 +1734,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
 
             LedgerInfo info = ledgers.get(ledgerId);
-            CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
+            CompletableFuture<ReadHandle> openFuture;
 
             if (config.getLedgerOffloader() != null
                     && config.getLedgerOffloader().getOffloadPolicies() != null
@@ -1767,17 +1773,35 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         });
     }
 
-    void invalidateLedgerHandle(ReadHandle ledgerHandle, Throwable t) {
+    void invalidateReadHandle(long ledgerId) {
+        CompletableFuture<ReadHandle> rhf = ledgerCache.remove(ledgerId);
+        if (rhf != null) {
+            rhf.thenAccept(ReadHandle::closeAsync)
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Failed to close a Ledger ReadHandle:", name, ex);
+                        return null;
+                    });
+        }
+    }
+
+    void invalidateLedgerHandle(ReadHandle ledgerHandle) {
         long ledgerId = ledgerHandle.getId();
+        LedgerHandle currentLedger = this.currentLedger;
+
         if (currentLedger != null && ledgerId != currentLedger.getId()) {
             // remove handle from ledger cache since we got a (read) error
             ledgerCache.remove(ledgerId);
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Removed ledger {} from cache (after read error)", name, ledgerId, t);
+                log.debug("[{}] Removed ledger read handle {} from cache", name, ledgerId);
             }
+            ledgerHandle.closeAsync()
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Failed to close a Ledger ReadHandle:", name, ex);
+                        return null;
+                    });
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Ledger that encountered read error is current ledger", name, t);
+                log.debug("[{}] Ledger that encountered read error is current ledger", name);
             }
         }
     }
@@ -2367,7 +2391,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
                             ls.getLedgerId(), currentLastConfirmedEntry);
                 }
-                ledgerCache.remove(ls.getLedgerId());
+
+                invalidateReadHandle(ls.getLedgerId());
 
                 ledgers.remove(ls.getLedgerId());
                 NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
@@ -2803,7 +2828,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                         newFirstUnoffloaded,
                                         errorToReport);
                         } else {
-                            ledgerCache.remove(ledgerId);
+                            invalidateReadHandle(ledgerId);
                             offloadLoop(promise, ledgersToOffload, firstUnoffloaded, firstError);
                         }
                     });
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index f3c1583..cc23f74 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -40,6 +40,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -112,6 +114,10 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         }
         verify(offloader, times(2))
                 .readOffloaded(anyLong(), (UUID) any(), anyMap());
+
+        ledger.close();
+        // Ensure that all the read handles had been closed
+        assertEquals(offloader.openedReadHandles.get(), 0);
     }
 
     @Test
@@ -225,10 +231,11 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
             return promise;
         }
 
+        @SneakyThrows
         @Override
         public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
                                                            Map<String, String> offloadDriverMetadata) {
-            return CompletableFuture.completedFuture(offloads.get(uuid));
+            return CompletableFuture.completedFuture(new VerifyClosingReadHandle(offloads.get(uuid)));
         }
 
         @Override
@@ -247,6 +254,21 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         public void close() {
 
         }
+
+        private final AtomicInteger openedReadHandles = new AtomicInteger(0);
+
+        class VerifyClosingReadHandle extends MockOffloadReadHandle {
+            VerifyClosingReadHandle(ReadHandle toCopy) throws Exception {
+                super(toCopy);
+                openedReadHandles.incrementAndGet();
+            }
+
+            @Override
+            public CompletableFuture<Void> closeAsync() {
+                openedReadHandles.decrementAndGet();
+                return super.closeAsync();
+            }
+        }
     }
 
     static class MockOffloadReadHandle implements ReadHandle {