You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:26 UTC

[pulsar] 10/10: Optimize the memory usage of Cache Eviction (#12045)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d846b436cf3184efce35325cbd183a32fd4dc5ad
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Sep 30 21:45:19 2021 +0800

    Optimize the memory usage of Cache Eviction (#12045)
    
    (cherry picked from commit 0c22e0fab596314bd5462f607ce6d03cb96ed484)
---
 .../mledger/impl/ManagedCursorContainer.java       | 38 ++++++++++++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 29 +++++++++------
 .../mledger/impl/ManagedCursorContainerTest.java   | 42 +++++++++++++++++++++-
 3 files changed, 91 insertions(+), 18 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index db887f0..848ce54 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -60,8 +60,23 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         }
     }
 
-    // Used to keep track of slowest cursor. Contains all of all the cursors except for non-durable cursors
-    // Since we do need to keep track of non-durable cursors.
+    public enum CursorType {
+        DurableCursor,
+        NonDurableCursor,
+        ALL
+    }
+
+    public ManagedCursorContainer() {
+        cursorType = CursorType.DurableCursor;
+    }
+
+    public ManagedCursorContainer(CursorType cursorType) {
+        this.cursorType = cursorType;
+    }
+
+    private final CursorType cursorType;
+
+    // Used to keep track of slowest cursor. Contains all of all active cursors.
     private final ArrayList<Item> heap = Lists.newArrayList();
 
     // Maps a cursor to its position in the heap
@@ -76,8 +91,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
             Item item = new Item(cursor, heap.size());
             cursors.put(cursor.getName(), item);
 
-            // don't need to add non-durable cursors
-            if (cursor.isDurable()) {
+            if (shouldTrackInHeap(cursor)) {
                 heap.add(item);
                 siftUp(item);
             }
@@ -86,6 +100,16 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         }
     }
 
+    private boolean shouldTrackInHeap(ManagedCursor cursor) {
+        return CursorType.ALL.equals(cursorType)
+                || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
+                || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
+    }
+
+    public PositionImpl getSlowestReadPositionForActiveCursors() {
+        return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
+    }
+
     public ManagedCursor get(String name) {
         long stamp = rwLock.readLock();
         try {
@@ -101,7 +125,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         try {
             Item item = cursors.remove(name);
 
-            if (item.cursor.isDurable()) {
+            if (shouldTrackInHeap(item.cursor)) {
                 // Move the item to the right end of the heap to be removed
                 Item lastItem = heap.get(heap.size() - 1);
                 swap(item, lastItem);
@@ -132,7 +156,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
             }
 
 
-            if (item.cursor.isDurable()) {
+            if (shouldTrackInHeap(item.cursor)) {
                 PositionImpl previousSlowestConsumer = heap.get(0).position;
 
                 // When the cursor moves forward, we need to push it toward the
@@ -146,7 +170,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
                 }
 
                 PositionImpl newSlowestConsumer = heap.get(0).position;
-                    return Pair.of(previousSlowestConsumer, newSlowestConsumer);
+                return Pair.of(previousSlowestConsumer, newSlowestConsumer);
             }
             return null;
         } finally {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 20d749d..3f69bcb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -154,6 +154,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private final ManagedCursorContainer cursors = new ManagedCursorContainer();
     private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
+    private final ManagedCursorContainer nonDurableActiveCursors =
+            new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
 
     // Ever increasing counter of entries added
     @VisibleForTesting
@@ -1917,6 +1919,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() <= 0) {
+            return;
+        }
         // Always remove all entries already read by active cursors
         PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors();
         if (slowestReaderPos != null) {
@@ -1928,17 +1933,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     private PositionImpl getEarlierReadPositionForActiveCursors() {
-        PositionImpl smallest = null;
-        for (ManagedCursor cursor : activeCursors) {
-            PositionImpl p = (PositionImpl) cursor.getReadPosition();
-            if (smallest == null) {
-                smallest = p;
-            } else if (p.compareTo(smallest) < 0) {
-                smallest = p;
-            }
+        PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
+        PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
+        if (nonDurablePosition == null) {
+            return durablePosition;
         }
-
-        return smallest;
+        if (durablePosition == null) {
+            return nonDurablePosition;
+        }
+        return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
     }
 
     void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
@@ -3101,6 +3104,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (activeCursors.get(cursor.getName()) == null) {
             activeCursors.add(cursor);
         }
+        if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
+            nonDurableActiveCursors.add(cursor);
+        }
     }
 
     public void deactivateCursor(ManagedCursor cursor) {
@@ -3117,6 +3123,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                             getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
                 }
             }
+            if (!cursor.isDurable()) {
+                nonDurableActiveCursors.removeCursor(cursor.getName());
+            }
         }
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index d0b0b2c..a43e6b4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -18,12 +18,13 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
@@ -360,6 +361,45 @@ public class ManagedCursorContainerTest {
     }
 
     @Test
+    public void testSlowestReadPositionForActiveCursors() throws Exception {
+        ManagedCursorContainer container =
+                new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
+        assertNull(container.getSlowestReadPositionForActiveCursors());
+
+        // Add no durable cursor
+        PositionImpl position = PositionImpl.get(5,5);
+        ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
+        doReturn(false).when(cursor1).isDurable();
+        doReturn(position).when(cursor1).getReadPosition();
+        container.add(cursor1);
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+
+        // Add no durable cursor
+        position = PositionImpl.get(1,1);
+        ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
+        doReturn(false).when(cursor2).isDurable();
+        doReturn(position).when(cursor2).getReadPosition();
+        container.add(cursor2);
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
+
+        // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5
+        position = PositionImpl.get(5,6);
+        container.cursorUpdated(cursor2, position);
+        doReturn(position).when(cursor2).getReadPosition();
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+
+        // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6
+        position = PositionImpl.get(5,8);
+        doReturn(position).when(cursor1).getReadPosition();
+        container.cursorUpdated(cursor1, position);
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6));
+
+        // Remove cursor, only cursor1 left, cursor1 = 5:8
+        container.removeCursor(cursor2.getName());
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8));
+    }
+
+    @Test
     public void simple() throws Exception {
         ManagedCursorContainer container = new ManagedCursorContainer();
         assertNull(container.getSlowestReaderPosition());