You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/10/25 07:34:06 UTC
[pulsar] branch branch-2.11 updated: [improve][broker][branch-2.11] Make read compacted entries support maxReadSizeBytes limitation (#21065) (#21430)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 505834a6ab4 [improve][broker][branch-2.11] Make read compacted entries support maxReadSizeBytes limitation (#21065) (#21430)
505834a6ab4 is described below
commit 505834a6ab4919a3da879d8512925dc100c5da19
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Oct 25 15:33:59 2023 +0800
[improve][broker][branch-2.11] Make read compacted entries support maxReadSizeBytes limitation (#21065) (#21430)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +-
.../PersistentDispatcherSingleActiveConsumer.java | 2 +-
...entStreamingDispatcherSingleActiveConsumer.java | 2 +-
.../apache/pulsar/compaction/CompactedTopic.java | 3 +-
.../pulsar/compaction/CompactedTopicImpl.java | 15 +++++-
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++++++++++++
6 files changed, 75 insertions(+), 7 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e14c1a20c09..6216fcda73d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -3362,7 +3362,7 @@ public class ManagedCursorImpl implements ManagedCursor {
return this.mbean;
}
- void updateReadStats(int readEntriesCount, long readEntriesSize) {
+ public void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
}
@@ -3388,7 +3388,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}, null);
}
- private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
+ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c9d810357c2..291d46d6f4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -367,7 +367,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
}
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
this, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 2048bb016b8..b64ad50d193 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -217,7 +217,7 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 9e50fc07152..3cbccd4e8bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -30,7 +30,8 @@ public interface CompactedTopic {
CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor,
- int numberOfEntriesToRead,
+ int maxEntries,
+ long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 7188ee6aa88..b20ff438629 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
@@ -86,7 +87,8 @@ public class CompactedTopicImpl implements CompactedTopic {
@Override
public void asyncReadEntriesOrWait(ManagedCursor cursor,
- int numberOfEntriesToRead,
+ int maxEntries,
+ long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
synchronized (this) {
@@ -101,8 +103,11 @@ public class CompactedTopicImpl implements CompactedTopic {
ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
- cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
+ cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
} else {
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
+ int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
+
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
.thenCompose((startPoint) -> {
@@ -127,6 +132,12 @@ public class CompactedTopicImpl implements CompactedTopic {
}
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
+ long entriesSize = 0;
+ for (Entry entry : entries) {
+ entriesSize += entry.getLength();
+ }
+ managedCursor.updateReadStats(entries.size(), entriesSize);
+
Entry lastEntry = entries.get(entries.size() - 1);
// The compaction task depends on the last snapshot and the incremental
// entries to build the new snapshot. So for the compaction cursor, we
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index ac72ef52841..68775bc090a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -55,11 +56,14 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -89,6 +93,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -1844,6 +1849,57 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.close();
}
+ @Test
+ public void testDispatcherMaxReadSizeBytes() throws Exception {
+ final String topicName =
+ "persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + UUID.randomUUID();
+ final String subName = "my-sub";
+ final int receiveQueueSize = 1;
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+
+ for (int i = 0; i < 10; i+=2) {
+ producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ admin.topics().unload(topicName);
+
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
+ .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+ .subscribe();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentSubscription persistentSubscription = topic.getSubscriptions().get(subName);
+ PersistentDispatcherSingleActiveConsumer dispatcher =
+ Mockito.spy((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher());
+ FieldUtils.writeDeclaredField(persistentSubscription, "dispatcher", dispatcher, true);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertSame(consumer.getStats().getMsgNumInReceiverQueue(), 1);
+ });
+
+ consumer.increaseAvailablePermits(2);
+
+ Thread.sleep(2000);
+
+ Mockito.verify(dispatcher, Mockito.atLeastOnce())
+ .readEntriesComplete(Mockito.argThat(argument -> argument.size() == 1),
+ Mockito.any(PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.class));
+
+ consumer.close();
+ producer.close();
+ }
+
@Test
public void testCompactionDuplicate() throws Exception {
String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate";