You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/11/13 10:08:13 UTC

(pulsar) branch branch-3.1 updated: Revert "[fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465)"

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c616deb1dc5 Revert "[fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465)"
c616deb1dc5 is described below

commit c616deb1dc580d564a3f4694f23c6be96ccfe316
Author: coderzc <zh...@apache.org>
AuthorDate: Mon Nov 13 18:07:55 2023 +0800

    Revert "[fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465)"
    
    This reverts commit 80f921a45bb023fca36faf98038f3ec687e05f16.
---
 .../PersistentDispatcherSingleActiveConsumer.java  |  6 +--
 .../apache/pulsar/compaction/CompactedTopic.java   |  5 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  3 +-
 .../pulsar/compaction/CompactedTopicUtils.java     | 10 ++--
 .../pulsar/broker/transaction/TransactionTest.java | 55 ----------------------
 .../pulsar/compaction/CompactedTopicUtilsTest.java |  4 +-
 6 files changed, 12 insertions(+), 71 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5e9183df0b1..d96429693fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -55,7 +55,6 @@ import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.compaction.CompactedTopicUtils;
-import org.apache.pulsar.compaction.TopicCompactionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -351,9 +350,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
                     boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
-                    TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
-                    CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
-                            bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
+                    CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor,
+                            messagesToRead, bytesToRead, readFromEarliest, this, true, consumer);
                 } else {
                     ReadEntriesCtx readEntriesCtx =
                             ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 146ba4327d2..8c17e0f3ca3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -24,7 +24,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
 
 public interface CompactedTopic {
@@ -35,14 +34,12 @@ public interface CompactedTopic {
      * Read entries from compacted topic.
      *
      * @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor,
-     * int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, ReadEntriesCallback, boolean, Consumer)}
-     * instead.
+     * int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
      */
     @Deprecated
     void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int maxEntries,
                                 long bytesToRead,
-                                PositionImpl maxReadPosition,
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 8794e2736d4..b028b708c49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -96,7 +96,6 @@ public class CompactedTopicImpl implements CompactedTopic {
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
                                        int maxEntries,
                                        long bytesToRead,
-                                       PositionImpl maxReadPosition,
                                        boolean isFirstRead,
                                        ReadEntriesCallback callback, Consumer consumer) {
             PositionImpl cursorPosition;
@@ -113,7 +112,7 @@ public class CompactedTopicImpl implements CompactedTopic {
 
             if (currentCompactionHorizon == null
                 || currentCompactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
+                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
             } else {
                 ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
                 int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index d3464d402e9..66bcf4c3002 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -42,8 +42,8 @@ public class CompactedTopicUtils {
     @Beta
     public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService,
                                                  ManagedCursor cursor, int maxEntries,
-                                                 long bytesToRead, PositionImpl maxReadPosition,
-                                                 boolean readFromEarliest, AsyncCallbacks.ReadEntriesCallback callback,
+                                                 long bytesToRead, boolean readFromEarliest,
+                                                 AsyncCallbacks.ReadEntriesCallback callback,
                                                  boolean wait, @Nullable Consumer consumer) {
         Objects.requireNonNull(topicCompactionService);
         Objects.requireNonNull(cursor);
@@ -68,9 +68,11 @@ public class CompactedTopicUtils {
                     || readPosition.compareTo(
                     lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
                 if (wait) {
-                    cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
+                    cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx,
+                        PositionImpl.LATEST);
                 } else {
-                    cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
+                    cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx,
+                        PositionImpl.LATEST);
                 }
                 return CompletableFuture.completedFuture(null);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e4cc33de14b..cf389824794 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1783,59 +1783,4 @@ public class TransactionTest extends TransactionTestBase {
                 });
     }
 
-    @Test
-    public void testReadCommittedWithReadCompacted() throws Exception{
-        final String namespace = "tnx/ns-prechecks";
-        final String topic = "persistent://" + namespace + "/test_transaction_topic";
-        admin.namespaces().createNamespace(namespace);
-        admin.topics().createNonPartitionedTopic(topic);
-
-        admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);
-
-        @Cleanup
-        Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
-                .topic(topic)
-                .subscriptionName("sub")
-                .subscriptionType(SubscriptionType.Exclusive)
-                .readCompacted(true)
-                .subscribe();
-
-        @Cleanup
-        Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
-                .topic(topic)
-                .create();
-
-        producer.newMessage().key("K1").value("V1").send();
-
-        Transaction txn = pulsarClient.newTransaction()
-                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
-        producer.newMessage(txn).key("K2").value("V2").send();
-        producer.newMessage(txn).key("K3").value("V3").send();
-
-        List<String> messages = new ArrayList<>();
-        while (true) {
-            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
-            if (message == null) {
-                break;
-            }
-            messages.add(message.getValue());
-        }
-
-        Assert.assertEquals(messages, List.of("V1"));
-
-        txn.commit();
-
-        messages.clear();
-
-        while (true) {
-            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
-            if (message == null) {
-                break;
-            }
-            messages.add(message.getValue());
-        }
-
-        Assert.assertEquals(messages, List.of("V2", "V3"));
-    }
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
index 2545c0362e8..94f2a17a2a3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -69,8 +69,8 @@ public class CompactedTopicUtilsTest {
             }
         };
 
-        CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
-                PositionImpl.LATEST, false, readEntriesCallback, false, null);
+        CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, false,
+                readEntriesCallback, false, null);
 
         List<Entry> entries = completableFuture.get();
         Assert.assertTrue(entries.isEmpty());