You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:26 UTC
[pulsar] 10/10: Optimize the memory usage of Cache Eviction (#12045)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d846b436cf3184efce35325cbd183a32fd4dc5ad
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Sep 30 21:45:19 2021 +0800
Optimize the memory usage of Cache Eviction (#12045)
(cherry picked from commit 0c22e0fab596314bd5462f607ce6d03cb96ed484)
---
.../mledger/impl/ManagedCursorContainer.java | 38 ++++++++++++++++----
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 29 +++++++++------
.../mledger/impl/ManagedCursorContainerTest.java | 42 +++++++++++++++++++++-
3 files changed, 91 insertions(+), 18 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index db887f0..848ce54 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -60,8 +60,23 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
}
}
- // Used to keep track of slowest cursor. Contains all of all the cursors except for non-durable cursors
- // Since we do need to keep track of non-durable cursors.
+ public enum CursorType {
+ DurableCursor,
+ NonDurableCursor,
+ ALL
+ }
+
+ public ManagedCursorContainer() {
+ cursorType = CursorType.DurableCursor;
+ }
+
+ public ManagedCursorContainer(CursorType cursorType) {
+ this.cursorType = cursorType;
+ }
+
+ private final CursorType cursorType;
+
+ // Used to keep track of slowest cursor. Contains all of all active cursors.
private final ArrayList<Item> heap = Lists.newArrayList();
// Maps a cursor to its position in the heap
@@ -76,8 +91,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
Item item = new Item(cursor, heap.size());
cursors.put(cursor.getName(), item);
- // don't need to add non-durable cursors
- if (cursor.isDurable()) {
+ if (shouldTrackInHeap(cursor)) {
heap.add(item);
siftUp(item);
}
@@ -86,6 +100,16 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
}
}
+ private boolean shouldTrackInHeap(ManagedCursor cursor) {
+ return CursorType.ALL.equals(cursorType)
+ || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
+ || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
+ }
+
+ public PositionImpl getSlowestReadPositionForActiveCursors() {
+ return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
+ }
+
public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
@@ -101,7 +125,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
try {
Item item = cursors.remove(name);
- if (item.cursor.isDurable()) {
+ if (shouldTrackInHeap(item.cursor)) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
@@ -132,7 +156,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
}
- if (item.cursor.isDurable()) {
+ if (shouldTrackInHeap(item.cursor)) {
PositionImpl previousSlowestConsumer = heap.get(0).position;
// When the cursor moves forward, we need to push it toward the
@@ -146,7 +170,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
}
PositionImpl newSlowestConsumer = heap.get(0).position;
- return Pair.of(previousSlowestConsumer, newSlowestConsumer);
+ return Pair.of(previousSlowestConsumer, newSlowestConsumer);
}
return null;
} finally {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 20d749d..3f69bcb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -154,6 +154,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final ManagedCursorContainer cursors = new ManagedCursorContainer();
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
+ private final ManagedCursorContainer nonDurableActiveCursors =
+ new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
// Ever increasing counter of entries added
@VisibleForTesting
@@ -1917,6 +1919,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
void doCacheEviction(long maxTimestamp) {
+ if (entryCache.getSize() <= 0) {
+ return;
+ }
// Always remove all entries already read by active cursors
PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors();
if (slowestReaderPos != null) {
@@ -1928,17 +1933,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
private PositionImpl getEarlierReadPositionForActiveCursors() {
- PositionImpl smallest = null;
- for (ManagedCursor cursor : activeCursors) {
- PositionImpl p = (PositionImpl) cursor.getReadPosition();
- if (smallest == null) {
- smallest = p;
- } else if (p.compareTo(smallest) < 0) {
- smallest = p;
- }
+ PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
+ PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors();
+ if (nonDurablePosition == null) {
+ return durablePosition;
}
-
- return smallest;
+ if (durablePosition == null) {
+ return nonDurablePosition;
+ }
+ return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}
void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
@@ -3101,6 +3104,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (activeCursors.get(cursor.getName()) == null) {
activeCursors.add(cursor);
}
+ if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) {
+ nonDurableActiveCursors.add(cursor);
+ }
}
public void deactivateCursor(ManagedCursor cursor) {
@@ -3117,6 +3123,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition()));
}
}
+ if (!cursor.isDurable()) {
+ nonDurableActiveCursors.removeCursor(cursor.getName());
+ }
}
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index d0b0b2c..a43e6b4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -18,12 +18,13 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
@@ -360,6 +361,45 @@ public class ManagedCursorContainerTest {
}
@Test
+ public void testSlowestReadPositionForActiveCursors() throws Exception {
+ ManagedCursorContainer container =
+ new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
+ assertNull(container.getSlowestReadPositionForActiveCursors());
+
+ // Add no durable cursor
+ PositionImpl position = PositionImpl.get(5,5);
+ ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position));
+ doReturn(false).when(cursor1).isDurable();
+ doReturn(position).when(cursor1).getReadPosition();
+ container.add(cursor1);
+ assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+
+ // Add no durable cursor
+ position = PositionImpl.get(1,1);
+ ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position));
+ doReturn(false).when(cursor2).isDurable();
+ doReturn(position).when(cursor2).getReadPosition();
+ container.add(cursor2);
+ assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1));
+
+ // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5
+ position = PositionImpl.get(5,6);
+ container.cursorUpdated(cursor2, position);
+ doReturn(position).when(cursor2).getReadPosition();
+ assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5));
+
+ // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6
+ position = PositionImpl.get(5,8);
+ doReturn(position).when(cursor1).getReadPosition();
+ container.cursorUpdated(cursor1, position);
+ assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6));
+
+ // Remove cursor, only cursor1 left, cursor1 = 5:8
+ container.removeCursor(cursor2.getName());
+ assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8));
+ }
+
+ @Test
public void simple() throws Exception {
ManagedCursorContainer container = new ManagedCursorContainer();
assertNull(container.getSlowestReaderPosition());