You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/31 08:49:14 UTC
(pulsar) branch branch-3.1 updated: [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 313eae5eff2 [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)
313eae5eff2 is described below
commit 313eae5eff25df2d584773f059365f0edb82b35a
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Wed Jan 31 00:11:07 2024 +0800
[fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)
---
.../apache/pulsar/broker/service/ServerCnx.java | 32 +++++++++++++++++-----
.../compaction/GetLastMessageIdCompactedTest.java | 27 ++++++++++++++++++
2 files changed, 52 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cca53bf9d6a..5057b7b045a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2079,7 +2079,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
- consumer.getSubscription().getName());
+ consumer.getSubscription().getName(),
+ consumer.readCompacted());
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
@@ -2097,15 +2098,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
PositionImpl markDeletePosition,
int partitionIndex,
long requestId,
- String subscriptionName) {
+ String subscriptionName,
+ boolean readCompacted) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// If it's not pointing to a valid entry, respond messageId of the current position.
// If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
- CompletableFuture<Position> compactionHorizonFuture =
- persistentTopic.getTopicCompactionService().getLastCompactedPosition();
+ CompletableFuture<Position> compactionHorizonFuture = readCompacted
+ ? persistentTopic.getTopicCompactionService().getLastCompactedPosition() :
+ CompletableFuture.completedFuture(null);
compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
if (ex != null) {
@@ -2114,8 +2117,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
return;
}
- if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
- && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) {
+ if (lastPosition.getEntryId() == -1 || !ml.ledgerExists(lastPosition.getLedgerId())) {
+ // there is no entry in the original topic
+ if (compactionHorizon != null) {
+ // if readCompacted is true, we need to read the last entry from compacted topic
+ handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
+ markDeletePosition);
+ } else {
+ // if readCompacted is false, we need to return MessageId.earliest
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
+ markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+ markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+ }
+ return;
+ }
+
+ if (compactionHorizon != null && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0) {
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
return;
@@ -2144,7 +2161,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
- if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+ if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException
+ && readCompacted) {
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 317b1a227e5..6c2d848bb7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.compaction;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -32,6 +34,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -415,4 +418,28 @@ public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
producer.close();
admin.topics().delete(topicName, false);
}
+
+ @Test(dataProvider = "enabledBatch")
+ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+ producer.newMessage().key("k0").value("v0").sendAsync();
+ producer.newMessage().key("k0").value("v1").sendAsync();
+ producer.flush();
+
+ triggerCompactionAndWait(topicName);
+ triggerLedgerSwitch(topicName);
+ clearAllTheLedgersOutdated(topicName);
+
+ var reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .startMessageId(MessageId.earliest)
+ .create();
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext(5, TimeUnit.SECONDS);
+ assertNotEquals(message, null);
+ }
+ }
}