You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by al...@apache.org on 2022/06/17 03:27:16 UTC
[pulsar] branch master updated: fix npe when doCacheEviction (#15184)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e685742cf70 fix npe when doCacheEviction (#15184)
e685742cf70 is described below
commit e685742cf70df1f45600e9a3bfa9564ecea12701
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jun 17 11:27:09 2022 +0800
fix npe when doCacheEviction (#15184)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 64 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 5 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 13c7b6a8db3..95ec266342f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2175,17 +2175,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
- void doCacheEviction(long maxTimestamp) {
- if (entryCache.getSize() <= 0) {
- return;
- }
+ public PositionImpl getEvictionPosition(){
PositionImpl evictionPos;
if (config.isCacheEvictionByMarkDeletedPosition()) {
- evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext();
+ PositionImpl earlierMarkDeletedPosition = getEarlierMarkDeletedPositionForActiveCursors();
+ evictionPos = earlierMarkDeletedPosition != null ? earlierMarkDeletedPosition.getNext() : null;
} else {
// Always remove all entries already read by active cursors
evictionPos = getEarlierReadPositionForActiveCursors();
}
+ return evictionPos;
+ }
+ void doCacheEviction(long maxTimestamp) {
+ if (entryCache.getSize() <= 0) {
+ return;
+ }
+ PositionImpl evictionPos = getEvictionPosition();
if (evictionPos != null) {
entryCache.invalidateEntries(evictionPos);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 04f75dd7232..02696ff504d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -295,6 +295,70 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testDoCacheEviction() throws Throwable {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setCacheEvictionByMarkDeletedPosition(true);
+ factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+ .toNanos(30000));
+ factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+ @Override
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
+ @Override
+ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+ ManagedLedger ledger = (ManagedLedger) ctx;
+ String message1 = "test";
+ ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ try {
+ @SuppressWarnings("unchecked")
+ Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
+ ManagedLedger ledger = pair.getLeft();
+ ManagedCursor cursor = pair.getRight();
+ if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+ result.complete(false);
+ return;
+ }
+
+ ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+ ledgerImpl.getActiveCursors().removeCursor(cursor.getName());
+ assertNull(ledgerImpl.getEvictionPosition());
+ assertTrue(ledgerImpl.getCacheSize() == message1.getBytes(Encoding).length);
+ ledgerImpl.doCacheEviction(System.nanoTime());
+ assertTrue(ledgerImpl.getCacheSize() <= 0);
+ result.complete(true);
+ } catch (Throwable e) {
+ result.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, Pair.of(ledger, cursor));
+ }
+
+ @Override
+ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, ledger);
+ }
+
+ @Override
+ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, null, null);
+ assertTrue(result.get());
+
+ log.info("Test completed");
+ }
+
@Test
public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
CompletableFuture<Boolean> result = new CompletableFuture<>();