You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/07/02 13:34:44 UTC
[pulsar] branch master updated: fix compaction entry read exception
(#11175)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c716495 fix compaction entry read exception (#11175)
c716495 is described below
commit c716495a4282093bd81f9cd43f909811213bfb32
Author: hangc0276 <ch...@apache.org>
AuthorDate: Fri Jul 2 21:34:10 2021 +0800
fix compaction entry read exception (#11175)
---
.../pulsar/compaction/CompactedTopicImpl.java | 1 +
.../pulsar/compaction/CompactedTopicTest.java | 57 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 8bdd0d8..c4646ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -109,6 +109,7 @@ public class CompactedTopicImpl implements CompactedTopic {
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
+ return CompletableFuture.completedFuture(null);
}
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 2d3411b..410d1e5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -46,14 +46,16 @@ import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
@@ -305,4 +307,57 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
producer.close();
}
+
+ @Test(timeOut = 30000)
+ public void testReadMessageFromCompactedLedger() throws Exception {
+ final String key = "1";
+ String msg = "test compaction msg";
+ final String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+ final int numMessages = 10;
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+ for (int i = 0; i < numMessages; ++i) {
+ producer.newMessage().key(key).value(msg).send();
+ }
+
+ admin.topics().triggerCompaction(topic);
+ boolean succeed = retryStrategically((test) -> {
+ try {
+ return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
+
+ Assert.assertTrue(succeed);
+
+
+ final String newKey = "2";
+ String newMsg = "test compaction msg v2";
+ for (int i = 0; i < numMessages; ++i) {
+ producer.newMessage().key(newKey).value(newMsg).send();
+ }
+
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test")
+ .readCompacted(true)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ int compactedMsgCount = 0;
+ int nonCompactedMsgCount = 0;
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext();
+ if (key.equals(message.getKey()) && msg.equals(message.getValue())) {
+ compactedMsgCount++;
+ } else if (newKey.equals(message.getKey()) && newMsg.equals(message.getValue())) {
+ nonCompactedMsgCount++;
+ }
+ }
+
+ Assert.assertEquals(compactedMsgCount, 1);
+ Assert.assertEquals(nonCompactedMsgCount, numMessages);
+ }
}