You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/28 02:38:14 UTC
[pulsar] branch branch-2.7 updated: Issue 9082: Broker expires
messages one at a time after topic unload (#9083)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new d1579b7 Issue 9082: Broker expires messages one at a time after topic unload (#9083)
d1579b7 is described below
commit d1579b7f1be9ab70d74453f9fefaa49d1c48fe00
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Jan 7 18:44:48 2021 +0100
Issue 9082: Broker expires messages one at a time after topic unload (#9083)
Fixes #9082
In case of topic unload the ManagedLedger rolls a new ledger and this confuses the search for messages to be expired.
This is how OpFindNewest (the operation involved in message expiry, PersistentMessageExpiryMonitor) works:
- first check if first message is expired
- test in the last message
- perform a search...
At step two we are jumping to a position that is not valid, and the search ends immediately, returning only the first message
Ensure that we are jumping only to a valid position, this change allows the PersistentMessageExpiryMonitor to find the best range of messages to expire.
This change added tests:
- unit test that covers the change
- one reproducer for the problem, that now is fixes
- another test case for the expected end-to-end behaviour for a consumer that waits too much and messages expire
It can be also verified trying to reproduce manually the reproducer at #9082, after applying this patch the issue is no more reproducible
(cherry picked from commit 7313b3403867d5ca16ea68edeaedd42f8c389ad3)
---
managed-ledger/pom.xml | 8 +-
.../bookkeeper/mledger/impl/OpFindNewest.java | 10 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 38 ++++++++
.../pulsar/broker/service/MessageTTLTest.java | 107 +++++++++++++++++++++
4 files changed, 161 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index c171159..4c09e74 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -59,7 +59,7 @@
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>
-
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-metadata</artifactId>
@@ -97,6 +97,12 @@
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 861d247..f6a48d4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -23,12 +23,14 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+@Slf4j
class OpFindNewest implements ReadEntryCallback {
private final ManagedCursorImpl cursor;
private final PositionImpl startPosition;
@@ -74,10 +76,16 @@ class OpFindNewest implements ReadEntryCallback {
return;
} else {
lastMatchedPosition = position;
-
// check last entry
state = State.checkLast;
+ PositionImpl lastPosition = cursor.ledger.getLastPosition();
searchPosition = cursor.ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded);
+ if (lastPosition.compareTo(searchPosition) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("first position {} matches, last should be {}, but moving to lastPos {}", position, searchPosition, lastPosition);
+ }
+ searchPosition = lastPosition;
+ }
find();
}
break;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ffee4f4..256457c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -85,6 +85,7 @@ import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
+import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -2140,6 +2141,43 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
assertTrue(c1.isIndividuallyDeletedEntriesEmpty());
}
+ @Test(timeOut = 20000)
+ void testFindNewestMatchingAfterLedgerRollover() throws Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ ledger.addEntry("first-expired".getBytes(Encoding));
+ ledger.addEntry("second".getBytes(Encoding));
+ ledger.addEntry("third".getBytes(Encoding));
+ ledger.addEntry("fourth".getBytes(Encoding));
+ Position last = ledger.addEntry("last-expired".getBytes(Encoding));
+
+ // roll a new ledger
+ int numLedgersBefore = ledger.getLedgersInfo().size();
+ ledger.getConfig().setMaxEntriesPerLedger(1);
+ ledger.rollCurrentLedgerIfFull();
+ Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
+
+ // the algorithm looks for "expired" messages
+ // starting from the first, then it moves to the last message
+ // if the condition evaluates to true on the last message
+ // then we are done
+ // there was a bug (https://github.com/apache/pulsar/issues/9082)
+ // in which if the last message was in a different ledger
+ // the jump from the first message to the last message went
+ // to an invalid position and so the search stopped at the first message
+
+ // we want to assert here that the algorithm returns the position of the
+ // last message
+ assertEquals(last,
+ c1.findNewestMatching(entry -> {
+ byte[] data = entry.getDataAndRelease();
+ return Arrays.equals(data, "first-expired".getBytes(Encoding))
+ || Arrays.equals(data, "last-expired".getBytes(Encoding));
+ }));
+
+ }
+
public static byte[] getEntryPublishTime(String msg) throws Exception {
return Long.toString(System.currentTimeMillis()).getBytes();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
new file mode 100644
index 0000000..ccb402b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MessageTTLTest extends BrokerTestBase {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageTTLTest.class);
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ this.conf.setTtlDurationDefaultInSeconds(1);
+ this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
+ super.baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testMessageExpiryAfterTopicUnload() throws Exception {
+ int numMsgs = 50;
+ final String topicName = "persistent://prop/ns-abc/testttl";
+ final String subscriptionName = "ttl-sub-1";
+
+ pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscribe()
+ .close();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false) // this makes the test easier and predictable
+ .create();
+
+ List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+ for (int i = 0; i < numMsgs; i++) {
+ byte[] message = ("my-message-" + i).getBytes();
+ sendFutureList.add(producer.sendAsync(message));
+ }
+ FutureUtil.waitForAll(sendFutureList).get();
+ producer.close();
+ // unload a reload the topic
+ // this action created a new ledger
+ // having a managed ledger with more than one
+ // ledger should not impact message expiration
+ admin.topics().unload(topicName);
+ admin.topics().getStats(topicName);
+
+ PersistentTopicInternalStats internalStatsBeforeExpire = admin.topics().getInternalStats(topicName);
+ CursorStats statsBeforeExpire = internalStatsBeforeExpire.cursors.get(subscriptionName);
+ log.info("markDeletePosition before expire {}", statsBeforeExpire.markDeletePosition);
+ assertEquals(statsBeforeExpire.markDeletePosition, PositionImpl.get(3, -1).toString());
+
+ // wall clock time, we have to make the message to be considered "expired"
+ Thread.sleep(this.conf.getTtlDurationDefaultInSeconds() * 2000);
+ log.info("***** run message expiry now");
+ this.runMessageExpiryCheck();
+
+ // verify that the markDeletePosition was moved forward, and exacly to the last message
+ PersistentTopicInternalStats internalStatsAfterExpire = admin.topics().getInternalStats(topicName);
+ CursorStats statsAfterExpire = internalStatsAfterExpire.cursors.get(subscriptionName);
+ log.info("markDeletePosition after expire {}", statsAfterExpire.markDeletePosition);
+ assertEquals(statsAfterExpire.markDeletePosition, PositionImpl.get(3, numMsgs - 1 ).toString());
+
+ }
+
+}