You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/07/02 13:34:44 UTC

[pulsar] branch master updated: fix compaction entry read exception (#11175)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c716495  fix compaction entry read exception (#11175)
c716495 is described below

commit c716495a4282093bd81f9cd43f909811213bfb32
Author: hangc0276 <ch...@apache.org>
AuthorDate: Fri Jul 2 21:34:10 2021 +0800

    fix compaction entry read exception (#11175)
---
 .../pulsar/compaction/CompactedTopicImpl.java      |  1 +
 .../pulsar/compaction/CompactedTopicTest.java      | 57 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 8bdd0d8..c4646ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -109,6 +109,7 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 if (startPoint == NEWER_THAN_COMPACTED) {
                                     cursor.seek(compactionHorizon.getNext());
                                     callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                    return CompletableFuture.completedFuture(null);
                                 }
                                 return readEntries(context.ledger, startPoint, endPoint)
                                     .thenAccept((entries) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 2d3411b..410d1e5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -46,14 +46,16 @@ import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
@@ -305,4 +307,57 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
 
         producer.close();
     }
+
+    @Test(timeOut = 30000)
+    public void testReadMessageFromCompactedLedger() throws Exception {
+        final String key = "1";
+        String msg = "test compaction msg";
+        final String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+        final int numMessages = 10;
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        for (int i = 0; i < numMessages; ++i) {
+            producer.newMessage().key(key).value(msg).send();
+        }
+
+        admin.topics().triggerCompaction(topic);
+        boolean succeed = retryStrategically((test) -> {
+            try {
+                return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 200);
+
+        Assert.assertTrue(succeed);
+
+
+        final String newKey = "2";
+        String newMsg = "test compaction msg v2";
+        for (int i = 0; i < numMessages; ++i) {
+            producer.newMessage().key(newKey).value(newMsg).send();
+        }
+
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test")
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        int compactedMsgCount = 0;
+        int nonCompactedMsgCount = 0;
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext();
+            if (key.equals(message.getKey()) && msg.equals(message.getValue())) {
+                compactedMsgCount++;
+            } else if (newKey.equals(message.getKey()) && newMsg.equals(message.getValue())) {
+                nonCompactedMsgCount++;
+            }
+        }
+
+        Assert.assertEquals(compactedMsgCount, 1);
+        Assert.assertEquals(nonCompactedMsgCount, numMessages);
+    }
 }