You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by al...@apache.org on 2022/06/17 03:27:16 UTC

[pulsar] branch master updated: fix npe when doCacheEviction (#15184)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e685742cf70 fix npe when doCacheEviction (#15184)
e685742cf70 is described below

commit e685742cf70df1f45600e9a3bfa9564ecea12701
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jun 17 11:27:09 2022 +0800

    fix npe when doCacheEviction (#15184)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 64 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 13c7b6a8db3..95ec266342f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2175,17 +2175,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    void doCacheEviction(long maxTimestamp) {
-        if (entryCache.getSize() <= 0) {
-            return;
-        }
+    public PositionImpl getEvictionPosition(){
         PositionImpl evictionPos;
         if (config.isCacheEvictionByMarkDeletedPosition()) {
-            evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext();
+            PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
+            evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
         } else {
             // Always remove all entries already read by active cursors
             evictionPos = getEarlierReadPositionForActiveCursors();
         }
+        return evictionPos;
+    }
+    void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() <= 0) {
+            return;
+        }
+        PositionImpl evictionPos = getEvictionPosition();
         if (evictionPos != null) {
             entryCache.invalidateEntries(evictionPos);
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 04f75dd7232..02696ff504d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -295,6 +295,70 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testDoCacheEviction() throws Throwable {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
+        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+                .toNanos(30000));
+        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+            @Override
+            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
+                    @Override
+                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                        ManagedLedger ledger = (ManagedLedger) ctx;
+                        String message1 = "test";
+                        ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
+                            @Override
+                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                                try {
+                                    @SuppressWarnings("unchecked")
+                                    Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
+                                    ManagedLedger ledger = pair.getLeft();
+                                    ManagedCursor cursor = pair.getRight();
+                                    if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+                                        result.complete(false);
+                                        return;
+                                    }
+
+                                    ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+                                    ledgerImpl.getActiveCursors().removeCursor(cursor.getName());
+                                    assertNull(ledgerImpl.getEvictionPosition());
+                                    assertTrue(ledgerImpl.getCacheSize() == message1.getBytes(Encoding).length);
+                                    ledgerImpl.doCacheEviction(System.nanoTime());
+                                    assertTrue(ledgerImpl.getCacheSize() <= 0);
+                                    result.complete(true);
+                                } catch (Throwable e) {
+                                    result.completeExceptionally(e);
+                                }
+                            }
+
+                            @Override
+                            public void addFailed(ManagedLedgerException exception, Object ctx) {
+                                result.completeExceptionally(exception);
+                            }
+                        }, Pair.of(ledger, cursor));
+                    }
+
+                    @Override
+                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                        result.completeExceptionally(exception);
+                    }
+                }, ledger);
+            }
+
+            @Override
+            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                result.completeExceptionally(exception);
+            }
+        }, null, null);
+        assertTrue(result.get());
+
+        log.info("Test completed");
+    }
+
     @Test
     public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
         CompletableFuture<Boolean> result = new CompletableFuture<>();