You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/31 08:49:14 UTC

(pulsar) branch branch-3.1 updated: [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 313eae5eff2 [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)
313eae5eff2 is described below

commit 313eae5eff25df2d584773f059365f0edb82b35a
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Wed Jan 31 00:11:07 2024 +0800

    [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 32 +++++++++++++++++-----
 .../compaction/GetLastMessageIdCompactedTest.java  | 27 ++++++++++++++++++
 2 files changed, 52 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cca53bf9d6a..5057b7b045a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2079,7 +2079,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         (PositionImpl) markDeletePosition,
                         partitionIndex,
                         requestId,
-                        consumer.getSubscription().getName());
+                        consumer.getSubscription().getName(),
+                        consumer.readCompacted());
             }).exceptionally(e -> {
                 writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                         ServerError.UnknownError, "Failed to recover Transaction Buffer."));
@@ -2097,15 +2098,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             PositionImpl markDeletePosition,
             int partitionIndex,
             long requestId,
-            String subscriptionName) {
+            String subscriptionName,
+            boolean readCompacted) {
 
         PersistentTopic persistentTopic = (PersistentTopic) topic;
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
         // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
-        CompletableFuture<Position> compactionHorizonFuture =
-                persistentTopic.getTopicCompactionService().getLastCompactedPosition();
+        CompletableFuture<Position> compactionHorizonFuture = readCompacted
+                ? persistentTopic.getTopicCompactionService().getLastCompactedPosition() :
+                CompletableFuture.completedFuture(null);
 
         compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
             if (ex != null) {
@@ -2114,8 +2117,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 return;
             }
 
-            if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
-                    && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) {
+            if (lastPosition.getEntryId() == -1 || !ml.ledgerExists(lastPosition.getLedgerId())) {
+                // there is no entry in the original topic
+                if (compactionHorizon != null) {
+                    // if readCompacted is true, we need to read the last entry from compacted topic
+                    handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
+                            markDeletePosition);
+                } else {
+                    // if readCompacted is false, we need to return MessageId.earliest
+                    writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
+                            markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+                            markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+                }
+                return;
+            }
+
+            if (compactionHorizon != null && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0) {
                 handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
                         markDeletePosition);
                 return;
@@ -2144,7 +2161,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
             batchSizeFuture.whenComplete((batchSize, e) -> {
                 if (e != null) {
-                    if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+                    if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException
+                            && readCompacted) {
                         handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
                                 markDeletePosition);
                     } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 317b1a227e5..6c2d848bb7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.compaction;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -32,6 +34,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -415,4 +418,28 @@ public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Producer<String> producer = createProducer(enabledBatch, topicName);
+        producer.newMessage().key("k0").value("v0").sendAsync();
+        producer.newMessage().key("k0").value("v1").sendAsync();
+        producer.flush();
+
+        triggerCompactionAndWait(topicName);
+        triggerLedgerSwitch(topicName);
+        clearAllTheLedgersOutdated(topicName);
+
+        var reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest)
+                .create();
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext(5, TimeUnit.SECONDS);
+            assertNotEquals(message, null);
+        }
+    }
 }