You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/03/06 21:37:37 UTC
(pulsar) branch branch-3.0 updated: [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 140ca5e34ae [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)
140ca5e34ae is described below
commit 140ca5e34aeccf5657fb056d1fc6f6011ff6c3a7
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Thu Mar 7 05:37:31 2024 +0800
[fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)
---
.../apache/pulsar/broker/service/ServerCnx.java | 33 +++++++++++++++++-----
.../compaction/GetLastMessageIdCompactedTest.java | 27 ++++++++++++++++++
2 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 57081863a14..d40a4ec789f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,7 +2077,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
- consumer.getSubscription().getName());
+ consumer.getSubscription().getName(),
+ consumer.readCompacted());
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
@@ -2095,16 +2096,33 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
PositionImpl markDeletePosition,
int partitionIndex,
long requestId,
- String subscriptionName) {
-
+ String subscriptionName,
+ boolean readCompacted) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// If it's not pointing to a valid entry, respond messageId of the current position.
// If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
- Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
- if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
- && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
+ Optional<Position> compactionHorizon = readCompacted
+ ? persistentTopic.getCompactedTopic().getCompactionHorizon() : Optional.empty();
+ if (lastPosition.getEntryId() == -1 || !ml.ledgerExists(lastPosition.getLedgerId())) {
+ // there is no entry in the original topic
+ if (compactionHorizon != null && compactionHorizon.isPresent()) {
+ // if readCompacted is true, we need to read the last entry from compacted topic
+ handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
+ markDeletePosition);
+ return;
+ } else {
+ // if readCompacted is false, we need to return MessageId.earliest
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
+ markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+ markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+ }
+ return;
+ }
+
+ if (compactionHorizon != null && compactionHorizon.isPresent()
+ && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0) {
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
markDeletePosition);
return;
@@ -2133,7 +2151,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
- if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+ if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException
+ && readCompacted) {
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 317b1a227e5..6c2d848bb7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.compaction;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -32,6 +34,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -415,4 +418,28 @@ public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
producer.close();
admin.topics().delete(topicName, false);
}
+
+ @Test(dataProvider = "enabledBatch")
+ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+ producer.newMessage().key("k0").value("v0").sendAsync();
+ producer.newMessage().key("k0").value("v1").sendAsync();
+ producer.flush();
+
+ triggerCompactionAndWait(topicName);
+ triggerLedgerSwitch(topicName);
+ clearAllTheLedgersOutdated(topicName);
+
+ var reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .startMessageId(MessageId.earliest)
+ .create();
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext(5, TimeUnit.SECONDS);
+ assertNotEquals(message, null);
+ }
+ }
}