You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/08/14 14:37:35 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Fix incorrect number of read compacted entries (#20978)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new c9cc8a71755 [fix][broker] Fix incorrect number of read compacted entries (#20978)
c9cc8a71755 is described below

commit c9cc8a71755206fb45e9e5b60c28de58c4ade599
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Aug 14 18:37:55 2023 +0800

    [fix][broker] Fix incorrect number of read compacted entries (#20978)
    
    (cherry picked from commit 63d9eaf974d0acf59ea8fa11cc75c08b784cd38d)
---
 .../pulsar/compaction/CompactedTopicImpl.java      |  2 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 42 ++++++++++++++++++++--
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 2431d142098..7188ee6aa88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -119,7 +119,7 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 return CompletableFuture.completedFuture(null);
                             } else {
                                 long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
-                                                         startPoint + numberOfEntriesToRead);
+                                                         startPoint + (numberOfEntriesToRead - 1));
                                 if (startPoint == NEWER_THAN_COMPACTED) {
                                     cursor.seek(compactionHorizon.getNext());
                                     callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index ddad53fbc82..45d8354472d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -22,12 +22,13 @@ import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertFalse;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -45,7 +46,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBuf;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -61,6 +61,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -70,11 +71,14 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1748,4 +1752,38 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
                     Assert.assertNotEquals(ledgerId, -1L);
                 });
     }
+
+    @Test(timeOut = 100000)
+    public void testReceiverQueueSize() throws Exception {
+        final String topicName = "persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID();
+        final String subName = "my-sub";
+        final int receiveQueueSize = 1;
+        @Cleanup
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
+                .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+                .subscribe();
+
+        //Give some time to consume
+        Awaitility.await()
+                .untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
+                        receiveQueueSize));
+        consumer.close();
+        producer.close();
+    }
 }