You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/28 02:38:14 UTC

[pulsar] branch branch-2.7 updated: Issue 9082: Broker expires messages one at a time after topic unload (#9083)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new d1579b7  Issue 9082: Broker expires messages one at a time after topic unload (#9083)
d1579b7 is described below

commit d1579b7f1be9ab70d74453f9fefaa49d1c48fe00
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Jan 7 18:44:48 2021 +0100

    Issue 9082: Broker expires messages one at a time after topic unload (#9083)
    
    Fixes #9082
    
    In case of topic unload the ManagedLedger rolls a new ledger and this confuses the search for messages to be expired.
    
    This is how OpFindNewest (the operation involved in message expiry, PersistentMessageExpiryMonitor) works:
    - first check if first message is expired
    - test in the last message
    - perform a search...
    
    At step two we are jumping to a position that is not valid, and the search ends immediately, returning only the first message
    
    Ensure that we are jumping only to a valid position, this change allows the PersistentMessageExpiryMonitor to find the best range of messages to expire.
    
    This change added tests:
    - unit test that covers the change
    - one reproducer for the problem, that now is fixes
    - another test case for the expected end-to-end behaviour for a consumer that waits too much and messages expire
    
    It can be also verified trying to reproduce manually the reproducer at #9082, after applying this patch the issue is no more reproducible
    
    (cherry picked from commit 7313b3403867d5ca16ea68edeaedd42f8c389ad3)
---
 managed-ledger/pom.xml                             |   8 +-
 .../bookkeeper/mledger/impl/OpFindNewest.java      |  10 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  38 ++++++++
 .../pulsar/broker/service/MessageTTLTest.java      | 107 +++++++++++++++++++++
 4 files changed, 161 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index c171159..4c09e74 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -59,7 +59,7 @@
       <artifactId>pulsar-common</artifactId>
       <version>${project.version}</version>
     </dependency>
-    
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-metadata</artifactId>
@@ -97,6 +97,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <scope>test</scope>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 861d247..f6a48d4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -23,12 +23,14 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 
 import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
 
+@Slf4j
 class OpFindNewest implements ReadEntryCallback {
     private final ManagedCursorImpl cursor;
     private final PositionImpl startPosition;
@@ -74,10 +76,16 @@ class OpFindNewest implements ReadEntryCallback {
                 return;
             } else {
                 lastMatchedPosition = position;
-
                 // check last entry
                 state = State.checkLast;
+                PositionImpl lastPosition = cursor.ledger.getLastPosition();
                 searchPosition = cursor.ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded);
+                if (lastPosition.compareTo(searchPosition) < 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position, searchPosition, lastPosition);
+                    }
+                    searchPosition = lastPosition;
+                }
                 find();
             }
             break;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ffee4f4..256457c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -85,6 +85,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
+import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -2140,6 +2141,43 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         assertTrue(c1.isIndividuallyDeletedEntriesEmpty());
     }
 
+    @Test(timeOut = 20000)
+    void testFindNewestMatchingAfterLedgerRollover() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        ledger.addEntry("first-expired".getBytes(Encoding));
+        ledger.addEntry("second".getBytes(Encoding));
+        ledger.addEntry("third".getBytes(Encoding));
+        ledger.addEntry("fourth".getBytes(Encoding));
+        Position last = ledger.addEntry("last-expired".getBytes(Encoding));
+
+        // roll a new ledger
+        int numLedgersBefore = ledger.getLedgersInfo().size();
+        ledger.getConfig().setMaxEntriesPerLedger(1);
+        ledger.rollCurrentLedgerIfFull();
+        Awaitility.await().atMost(20, TimeUnit.SECONDS)
+                .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
+
+        // the algorithm looks for "expired" messages
+        // starting from the first, then it moves to the last message
+        // if the condition evaluates to true on the last message
+        // then we are done
+        // there was a bug (https://github.com/apache/pulsar/issues/9082)
+        // in which if the last message was in a different ledger
+        // the jump from the first message to the last message went
+        // to an invalid position and so the search stopped at the first message
+
+        // we want to assert here that the algorithm returns the position of the
+        // last message
+        assertEquals(last,
+                c1.findNewestMatching(entry -> {
+                    byte[] data = entry.getDataAndRelease();
+                    return Arrays.equals(data, "first-expired".getBytes(Encoding))
+                            || Arrays.equals(data, "last-expired".getBytes(Encoding));
+                }));
+
+    }
+
     public static byte[] getEntryPublishTime(String msg) throws Exception {
         return Long.toString(System.currentTimeMillis()).getBytes();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
new file mode 100644
index 0000000..ccb402b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MessageTTLTest extends BrokerTestBase {
+
+    private static final Logger log = LoggerFactory.getLogger(MessageTTLTest.class);
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setTtlDurationDefaultInSeconds(1);
+        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testMessageExpiryAfterTopicUnload() throws Exception {
+        int numMsgs = 50;
+        final String topicName = "persistent://prop/ns-abc/testttl";
+        final String subscriptionName = "ttl-sub-1";
+
+        pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscribe()
+                .close();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false) // this makes the test easier and predictable
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+        for (int i = 0; i < numMsgs; i++) {
+            byte[] message = ("my-message-" + i).getBytes();
+            sendFutureList.add(producer.sendAsync(message));
+        }
+        FutureUtil.waitForAll(sendFutureList).get();
+        producer.close();
+        // unload a reload the topic
+        // this action created a new ledger
+        // having a managed ledger with more than one
+        // ledger should not impact message expiration
+        admin.topics().unload(topicName);
+        admin.topics().getStats(topicName);
+
+        PersistentTopicInternalStats internalStatsBeforeExpire = admin.topics().getInternalStats(topicName);
+        CursorStats statsBeforeExpire = internalStatsBeforeExpire.cursors.get(subscriptionName);
+        log.info("markDeletePosition before expire {}", statsBeforeExpire.markDeletePosition);
+        assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3, -1).toString());
+
+        // wall clock time, we have to make the message to be considered "expired"
+        Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000);
+        log.info("***** run message expiry now");
+        this.runMessageExpiryCheck();
+
+        // verify that the markDeletePosition was moved forward, and exacly to the last message
+        PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName);
+        CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName);
+        log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition);
+        assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3, numMsgs - 1 ).toString());
+
+    }
+
+}