You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/16 01:25:55 UTC

[pulsar] branch master updated: Fixed readers backlog stats after data is skipped (#7236)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b9c90f  Fixed readers backlog stats after data is skipped (#7236)
6b9c90f is described below

commit 6b9c90ff89f541e84662292d450f628597c4b95a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 15 18:25:46 2020 -0700

    Fixed readers backlog stats after data is skipped (#7236)
    
    ### Motivation
    
    The metrics for the reader backlog keep increasing when data is dropped because the reader cursor only moves on the next read attempt.
    Instead we should proactively move the cursor forward on the first valid ledger.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 36 ++++++++++++++++-
 .../mledger/impl/NonDurableCursorTest.java         | 45 ++++++++++++++++++++++
 .../broker/service/BacklogQuotaManagerTest.java    |  2 +-
 3 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8b12f7b..48b864d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -44,11 +44,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -2067,6 +2068,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
+            advanceNonDurableCursors(ledgersToDelete);
+
             // Update metadata
             for (LedgerInfo ls : ledgersToDelete) {
                 ledgerCache.remove(ls.getLedgerId());
@@ -2126,6 +2129,37 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     /**
+     * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+        PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
+
+        cursors.forEach(cursor -> {
+            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
+                cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                        log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name,
+                                exception.getMessage());
+                    }
+                }, null);
+            }
+        });
+    }
+
+    /**
      * Delete this ManagedLedger completely from the system.
      *
      * @throws Exception
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 2772541..7a15035 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -28,8 +28,11 @@ import static org.testng.Assert.fail;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -678,6 +681,48 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testBacklogStatsWhenDroppingData() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+        c1.markDelete(positions.get(4));
+        assertEquals(c1.getNumberOfEntries(), 5);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+        // Since the durable cursor has moved, the data will be trimmed
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
+
+        c1.close();
+        ledger.deleteCursor(c1.getName());
+        promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);
+
+        ledger.close();
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     void testCursorWithNameIsNotNull() throws Exception {
         final String p1CursorName = "entry-1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 8636cf1..0c317a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -161,7 +161,7 @@ public class BacklogQuotaManagerTest {
             // non-durable mes should still
             assertEquals(stats.subscriptions.size(), 1);
             long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
-            assertEquals(nonDurableSubscriptionBacklog, numMsgs,
+            assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
               "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
 
             try {