You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/22 03:53:04 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465) (#21570)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6ba453d2e47 [fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465) (#21570)
6ba453d2e47 is described below
commit 6ba453d2e47804cab8e00c64ad83a387eed40e21
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Nov 22 11:52:56 2023 +0800
[fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465) (#21570)
---
.../PersistentDispatcherSingleActiveConsumer.java | 4 +-
...entStreamingDispatcherSingleActiveConsumer.java | 4 +-
.../apache/pulsar/compaction/CompactedTopic.java | 2 +
.../pulsar/compaction/CompactedTopicImpl.java | 3 +-
.../pulsar/broker/transaction/TransactionTest.java | 55 ++++++++++++++++++++++
5 files changed, 63 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 67265f626a1..eacc568f0a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -347,8 +347,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
}
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
- this, consumer);
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead,
+ topic.getMaxReadPosition(), isFirstRead, this, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 22dcc48994c..52bcc474c4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -217,8 +217,8 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
- this, consumer);
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead,
+ PositionImpl.LATEST, isFirstRead, this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index c575a872fd6..015df8582a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
public interface CompactedTopic {
@@ -32,6 +33,7 @@ public interface CompactedTopic {
void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
+ PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 703ba688d3b..75d8fcffd3f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -90,6 +90,7 @@ public class CompactedTopicImpl implements CompactedTopic {
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
+ PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
synchronized (this) {
@@ -104,7 +105,7 @@ public class CompactedTopicImpl implements CompactedTopic {
ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
- cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
+ cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
} else {
ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c4ec2ec766e..462cc38c887 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1778,4 +1778,59 @@ public class TransactionTest extends TransactionTestBase {
});
}
+ @Test
+ public void testReadCommittedWithReadCompacted() throws Exception{
+ final String namespace = "tnx/ns-prechecks";
+ final String topic = "persistent://" + namespace + "/test_transaction_topic";
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+
+ admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);
+
+ @Cleanup
+ Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .readCompacted(true)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.newMessage().key("K1").value("V1").send();
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn).key("K2").value("V2").send();
+ producer.newMessage(txn).key("K3").value("V3").send();
+
+ List<String> messages = new ArrayList<>();
+ while (true) {
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messages.add(message.getValue());
+ }
+
+ Assert.assertEquals(messages, List.of("V1"));
+
+ txn.commit();
+
+ messages.clear();
+
+ while (true) {
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messages.add(message.getValue());
+ }
+
+ Assert.assertEquals(messages, List.of("V2", "V3"));
+ }
+
}