You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/03/06 21:37:37 UTC

(pulsar) branch branch-3.0 updated: [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 140ca5e34ae [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)
140ca5e34ae is described below

commit 140ca5e34aeccf5657fb056d1fc6f6011ff6c3a7
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Thu Mar 7 05:37:31 2024 +0800

    [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 33 +++++++++++++++++-----
 .../compaction/GetLastMessageIdCompactedTest.java  | 27 ++++++++++++++++++
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 57081863a14..d40a4ec789f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,7 +2077,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         (PositionImpl) markDeletePosition,
                         partitionIndex,
                         requestId,
-                        consumer.getSubscription().getName());
+                        consumer.getSubscription().getName(),
+                        consumer.readCompacted());
             }).exceptionally(e -> {
                 writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                         ServerError.UnknownError, "Failed to recover Transaction Buffer."));
@@ -2095,16 +2096,33 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             PositionImpl markDeletePosition,
             int partitionIndex,
             long requestId,
-            String subscriptionName) {
-
+            String subscriptionName,
+            boolean readCompacted) {
         PersistentTopic persistentTopic = (PersistentTopic) topic;
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
         // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
-        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
-        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
-                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
+        Optional<Position> compactionHorizon = readCompacted
+                ? persistentTopic.getCompactedTopic().getCompactionHorizon() : Optional.empty();
+        if (lastPosition.getEntryId() == -1 || !ml.ledgerExists(lastPosition.getLedgerId())) {
+            // there is no entry in the original topic
+            if (compactionHorizon != null && compactionHorizon.isPresent()) {
+                // if readCompacted is true, we need to read the last entry from compacted topic
+                handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
+                        markDeletePosition);
+                return;
+            } else {
+                // if readCompacted is false, we need to return MessageId.earliest
+                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
+                        markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+                        markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+            }
+            return;
+        }
+
+        if (compactionHorizon != null && compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0) {
             handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
                     markDeletePosition);
             return;
@@ -2133,7 +2151,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
-                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException
+                        && readCompacted) {
                     handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
                             markDeletePosition);
                 } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 317b1a227e5..6c2d848bb7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.compaction;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -32,6 +34,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -415,4 +418,28 @@ public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Producer<String> producer = createProducer(enabledBatch, topicName);
+        producer.newMessage().key("k0").value("v0").sendAsync();
+        producer.newMessage().key("k0").value("v1").sendAsync();
+        producer.flush();
+
+        triggerCompactionAndWait(topicName);
+        triggerLedgerSwitch(topicName);
+        clearAllTheLedgersOutdated(topicName);
+
+        var reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest)
+                .create();
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext(5, TimeUnit.SECONDS);
+            assertNotEquals(message, null);
+        }
+    }
 }