You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/10/25 07:34:06 UTC

[pulsar] branch branch-2.11 updated: [improve][broker][branch-2.11] Make read compacted entries support maxReadSizeBytes limitation (#21065) (#21430)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 505834a6ab4 [improve][broker][branch-2.11] Make read compacted entries support maxReadSizeBytes limitation (#21065) (#21430)
505834a6ab4 is described below

commit 505834a6ab4919a3da879d8512925dc100c5da19
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Oct 25 15:33:59 2023 +0800

    [improve][broker][branch-2.11] Make read compacted entries support maxReadSizeBytes limitation (#21065) (#21430)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  2 +-
 ...entStreamingDispatcherSingleActiveConsumer.java |  2 +-
 .../apache/pulsar/compaction/CompactedTopic.java   |  3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      | 15 +++++-
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++++++++++++
 6 files changed, 75 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e14c1a20c09..6216fcda73d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -3362,7 +3362,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         return this.mbean;
     }
 
-    void updateReadStats(int readEntriesCount, long readEntriesSize) {
+    public void updateReadStats(int readEntriesCount, long readEntriesSize) {
         this.entriesReadCount += readEntriesCount;
         this.entriesReadSize += readEntriesSize;
     }
@@ -3388,7 +3388,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }, null);
     }
 
-    private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
+    public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
         if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
             return maxEntries;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c9d810357c2..291d46d6f4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -367,7 +367,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
                             this, consumer);
                 } else {
                     ReadEntriesCtx readEntriesCtx =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 2048bb016b8..b64ad50d193 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -217,7 +217,7 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
                 havePendingRead = true;
 
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
                             this, consumer);
                 } else {
                     streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 9e50fc07152..3cbccd4e8bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -30,7 +30,8 @@ public interface CompactedTopic {
     CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
     CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
     void asyncReadEntriesOrWait(ManagedCursor cursor,
-                                int numberOfEntriesToRead,
+                                int maxEntries,
+                                long bytesToRead,
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 7188ee6aa88..b20ff438629 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
@@ -86,7 +87,8 @@ public class CompactedTopicImpl implements CompactedTopic {
 
     @Override
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
-                                       int numberOfEntriesToRead,
+                                       int maxEntries,
+                                       long bytesToRead,
                                        boolean isFirstRead,
                                        ReadEntriesCallback callback, Consumer consumer) {
         synchronized (this) {
@@ -101,8 +103,11 @@ public class CompactedTopicImpl implements CompactedTopic {
             ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
             if (compactionHorizon == null
                 || compactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
+                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
             } else {
+                ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
+                int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
+
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
                         .thenCompose((startPoint) -> {
@@ -127,6 +132,12 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 }
                                 return readEntries(context.ledger, startPoint, endPoint)
                                     .thenAccept((entries) -> {
+                                        long entriesSize = 0;
+                                        for (Entry entry : entries) {
+                                            entriesSize += entry.getLength();
+                                        }
+                                        managedCursor.updateReadStats(entries.size(), entriesSize);
+
                                         Entry lastEntry = entries.get(entries.size() - 1);
                                         // The compaction task depends on the last snapshot and the incremental
                                         // entries to build the new snapshot. So for the compaction cursor, we
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index ac72ef52841..68775bc090a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -55,11 +56,14 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -89,6 +93,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1844,6 +1849,57 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.close();
     }
 
+    @Test
+    public void testDispatcherMaxReadSizeBytes() throws Exception {
+        final String topicName =
+                "persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + UUID.randomUUID();
+        final String subName = "my-sub";
+        final int receiveQueueSize = 1;
+        @Cleanup
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topicName).create();
+
+        for (int i = 0; i < 10; i+=2) {
+            producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
+        }
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        admin.topics().unload(topicName);
+
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
+                .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+                .subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription persistentSubscription = topic.getSubscriptions().get(subName);
+        PersistentDispatcherSingleActiveConsumer dispatcher =
+                Mockito.spy((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher());
+        FieldUtils.writeDeclaredField(persistentSubscription, "dispatcher", dispatcher, true);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertSame(consumer.getStats().getMsgNumInReceiverQueue(), 1);
+        });
+
+        consumer.increaseAvailablePermits(2);
+
+        Thread.sleep(2000);
+
+        Mockito.verify(dispatcher, Mockito.atLeastOnce())
+                .readEntriesComplete(Mockito.argThat(argument -> argument.size() == 1),
+                        Mockito.any(PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.class));
+
+        consumer.close();
+        producer.close();
+    }
+
     @Test
     public void testCompactionDuplicate() throws Exception {
         String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate";