You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:56:15 UTC
[pulsar] 07/13: Fix getting last message id from empty compact ledger (#13476)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c9c48d26e99d80bd9165e5478edcd1a1a7b8acab
Author: ran <ga...@126.com>
AuthorDate: Mon Dec 27 09:45:38 2021 +0800
Fix getting last message id from empty compact ledger (#13476)
### Motivation
Currently, if the last confirmed entry is an empty ledger, getting the last message-id operation will get data from the compact ledger, if the compact ledger is also an empty ledger, it will encounter `IncorrectParameterException`.
**Broker error message**
```
[pulsar-io-29-9] ERROR org.apache.bookkeeper.client.LedgerHandle - IncorrectParameterException on ledgerId:617 firstEntry:-1 lastEntry:-1
```
**Client error log**
```
Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription reader-bf9246cfcb of the topic persistent://public/ns-test/t1 gets the last message id was failed
{"errorMsg":"Failed to read last entry of the compacted Ledger Incorrect parameter input","reqId":79405902881798690, "remote":"localhost/127.0.0.1:6650", "local":"/127.0.0.1:55207"}
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1034)
at org.apache.pulsar.client.impl.ConsumerImpl.hasMessageAvailable(ConsumerImpl.java:2001)
at org.apache.pulsar.client.impl.ReaderImpl.hasMessageAvailable(ReaderImpl.java:181)
at org.apache.pulsar.compaction.CompactedTopicTest.main(CompactedTopicTest.java:730)
```
### Modifications
Check the compact ledger entry id before reading an entry from the compact ledger, if there is no entry, return a null value.
(cherry picked from commit 8136762e743fe1bc4be2adeb08631a8c44719d37)
---
.../pulsar/compaction/CompactedTopicImpl.java | 15 ++++++---
.../pulsar/compaction/CompactedTopicTest.java | 38 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 4577540..a6d6fc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -293,11 +293,16 @@ public class CompactedTopicImpl implements CompactedTopic {
if (compactionHorizon == null) {
return CompletableFuture.completedFuture(null);
}
- return compactedTopicContext.thenCompose(context ->
- readEntries(context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
- .thenCompose(entries -> entries.size() > 0
- ? CompletableFuture.completedFuture(entries.get(0))
- : CompletableFuture.completedFuture(null)));
+ return compactedTopicContext.thenCompose(context -> {
+ if (context.ledger.getLastAddConfirmed() == -1) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return readEntries(
+ context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
+ .thenCompose(entries -> entries.size() > 0
+ ? CompletableFuture.completedFuture(entries.get(0))
+ : CompletableFuture.completedFuture(null));
+ });
}
private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 69d66d9..d44868a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -650,4 +650,42 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
reader.close();
producer.close();
}
+
+ @Test(timeOut = 1000 * 30)
+ public void testReader() throws Exception {
+ final String ns = "my-property/use/my-ns";
+ String topic = "persistent://" + ns + "/t1";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+
+ producer.newMessage().key("k").value(("value").getBytes()).send();
+ producer.newMessage().key("k").value(null).send();
+ pulsar.getCompactor().compact(topic).get();
+
+ Awaitility.await()
+ .pollInterval(3, TimeUnit.SECONDS)
+ .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ admin.topics().unload(topic);
+ Thread.sleep(100);
+ Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1"));
+ });
+ // Make sure the last confirm entry is -1, then get last message id from compact ledger
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic);
+ Assert.assertTrue(internalStats.lastConfirmedEntry.endsWith("-1"));
+ // Because the latest value of the key `k` is null, so there is no data in compact ledger.
+ Assert.assertEquals(internalStats.compactedLedger.size, 0);
+
+ @Cleanup
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageIdInclusive()
+ .startMessageId(MessageId.earliest)
+ .readCompacted(true)
+ .create();
+ Assert.assertFalse(reader.hasMessageAvailable());
+ }
+
}