You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/28 09:37:26 UTC

[pulsar] branch branch-2.6 updated: fix the closed ledger did not delete after expired (#9136)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 5e21470  fix the closed ledger did not delete after expired (#9136)
5e21470 is described below

commit 5e21470ea2bfa16da4dae42ed1378b8b12c4eac4
Author: WangJialing <65...@users.noreply.github.com>
AuthorDate: Sun Jan 10 15:11:50 2021 +0800

    fix the closed ledger did not delete after expired (#9136)
    
    Fixes #9057
    
    When current ledger closed, if there is no incoming traffic, the read position of the cursor is still point to the last entry of the closed ledger,  that casue the `slowestReaderLedgerId` point to the closed ledger in `internalTrimConsumedLedgers()` and fail to delete the closed ledger.
    
    When close current ledger, if cursor's mark delete position point to the last entry of current ledger, move the read position to the new created ledger.
    
    add test case: testDeletionAfterLedgerClosedAndRetention()
    
    (cherry picked from commit 0e5c5362821515995188f0da0c27181aa25e4f6c)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++++++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 34 ++++++++++++++++++----
 .../service/CurrentLedgerRolloverIfFullTest.java   | 14 +++------
 3 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 30d50b8e..30db632 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1259,6 +1259,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         } else {
             log.info("[{}] Created new ledger {}", name, lh.getId());
             ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
+            final long previousEntries = currentLedgerEntries;
+            final long previousLedgerId = currentLedger.getId();
             currentLedger = lh;
             currentLedgerEntries = 0;
             currentLedgerSize = 0;
@@ -1276,6 +1278,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
                                 TimeUnit.MILLISECONDS);
                     }
+                    // Move cursor read point to new ledger
+                    for (ManagedCursor cursor : cursors) {
+                        PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
+                        if (markDeletedPosition.getLedgerId() == previousLedgerId && markDeletedPosition.getEntryId() + 1 >= previousEntries) {
+                            // All entries in last ledger are marked delete, move read point to the new ledger
+                            updateCursor((ManagedCursorImpl) cursor, PositionImpl.get(currentLedger.getId(), -1));
+                        }
+                    }
                 }
 
                 @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 87eebd1..bef1016 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -33,12 +33,10 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Charsets;
 import com.google.common.collect.Sets;
-
 import io.netty.buffer.ByteBufAllocator;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
@@ -63,8 +61,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -1828,6 +1824,34 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ml.close();
     }
 
+    @Test
+    public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(0);
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(1, TimeUnit.SECONDS);
+        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
+        ManagedCursor c1 = ml.openCursor("testCursor1");
+        ManagedCursor c2 = ml.openCursor("testCursor2");
+        ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
+        c1.skipEntries(1, IndividualDeletedEntries.Exclude);
+        c2.skipEntries(1, IndividualDeletedEntries.Exclude);
+        // let current ledger close
+        ml.rollCurrentLedgerIfFull();
+        // let retention expire
+        Thread.sleep(1500);
+        // delete the expired ledger
+        ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
+
+        // the closed and expired ledger should be deleted
+        assertTrue(ml.getLedgersInfoAsList().size() <= 1);
+        assertEquals(ml.getTotalSize(), 0);
+        ml.close();
+    }
+
     /**
      * Set retention time = 0 and create a empty ledger,
      * first position can't higher than last after trim ledgers.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index b70a595..783eac5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -29,8 +29,6 @@ import org.apache.pulsar.client.api.Producer;
 import org.junit.Test;
 import org.testng.Assert;
 
-import java.util.concurrent.TimeUnit;
-
 public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
     @Override
     protected void setup() throws Exception {
@@ -90,14 +88,10 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
         Assert.assertNotEquals(managedLedger.getCurrentLedgerSize(), 0);
 
         // trigger a ledger rollover
-        // and now we have two ledgers, one with expired data and one for empty
+        // the last ledger will be closed and removed and we have one ledger for empty
         managedLedger.rollCurrentLedgerIfFull();
         Thread.sleep(1000);
-        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
-
-        // trigger a ledger trimming
-        // and now we only have the empty ledger
-        managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
-        Assert.assertEquals(managedLedger.getCurrentLedgerSize(), 0);
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+        Assert.assertEquals(managedLedger.getTotalSize(), 0);
     }
 }