You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/08/14 14:37:35 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix incorrect number of read compacted entries (#20978)
This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c9cc8a71755 [fix][broker] Fix incorrect number of read compacted entries (#20978)
c9cc8a71755 is described below
commit c9cc8a71755206fb45e9e5b60c28de58c4ade599
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Aug 14 18:37:55 2023 +0800
[fix][broker] Fix incorrect number of read compacted entries (#20978)
(cherry picked from commit 63d9eaf974d0acf59ea8fa11cc75c08b784cd38d)
---
.../pulsar/compaction/CompactedTopicImpl.java | 2 +-
.../apache/pulsar/compaction/CompactionTest.java | 42 ++++++++++++++++++++--
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 2431d142098..7188ee6aa88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -119,7 +119,7 @@ public class CompactedTopicImpl implements CompactedTopic {
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
- startPoint + numberOfEntriesToRead);
+ startPoint + (numberOfEntriesToRead - 1));
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index ddad53fbc82..45d8354472d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -22,12 +22,13 @@ import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertFalse;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -45,7 +46,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBuf;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.BookKeeper;
@@ -61,6 +61,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -70,11 +71,14 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1748,4 +1752,38 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertNotEquals(ledgerId, -1L);
});
}
+
+ @Test(timeOut = 100000)
+ public void testReceiverQueueSize() throws Exception {
+ final String topicName = "persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID();
+ final String subName = "my-sub";
+ final int receiveQueueSize = 1;
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
+ .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+ .subscribe();
+
+ //Give some time to consume
+ Awaitility.await()
+ .untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
+ receiveQueueSize));
+ consumer.close();
+ producer.close();
+ }
}