You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:18:31 UTC

[pulsar] 05/20: Fixed readers backlog stats after data is skipped (#7236)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 450ec37256f0778bbe4a1d67e927266847aba101
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 15 18:25:46 2020 -0700

    Fixed readers backlog stats after data is skipped (#7236)
    
    ### Motivation
    
    The metrics for the reader backlog keep increasing when data is dropped because the reader cursor only moves on the next read attempt.
    Instead we should proactively move the cursor forward on the first valid ledger.
    
    (cherry picked from commit 6b9c90ff89f541e84662292d450f628597c4b95a)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 36 ++++++++++++++++-
 .../mledger/impl/NonDurableCursorTest.java         | 45 ++++++++++++++++++++++
 .../broker/service/BacklogQuotaManagerTest.java    |  2 +-
 3 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5fd9b24..4b3937f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -44,11 +44,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -2067,6 +2068,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
+            advanceNonDurableCursors(ledgersToDelete);
+
             // Update metadata
             for (LedgerInfo ls : ledgersToDelete) {
                 ledgerCache.remove(ls.getLedgerId());
@@ -2126,6 +2129,37 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     /**
+     * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+        PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
+
+        cursors.forEach(cursor -> {
+            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
+                cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                        log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name,
+                                exception.getMessage());
+                    }
+                }, null);
+            }
+        });
+    }
+
+    /**
      * Delete this ManagedLedger completely from the system.
      *
      * @throws Exception
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 2772541..7a15035 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -28,8 +28,11 @@ import static org.testng.Assert.fail;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -678,6 +681,48 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testBacklogStatsWhenDroppingData() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+        c1.markDelete(positions.get(4));
+        assertEquals(c1.getNumberOfEntries(), 5);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+        // Since the durable cursor has moved, the data will be trimmed
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
+
+        c1.close();
+        ledger.deleteCursor(c1.getName());
+        promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);
+
+        ledger.close();
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     void testCursorWithNameIsNotNull() throws Exception {
         final String p1CursorName = "entry-1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 8636cf1..0c317a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -161,7 +161,7 @@ public class BacklogQuotaManagerTest {
             // non-durable mes should still
             assertEquals(stats.subscriptions.size(), 1);
             long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
-            assertEquals(nonDurableSubscriptionBacklog, numMsgs,
+            assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
               "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
 
             try {