You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:56:15 UTC

[pulsar] 07/13: Fix getting last message id from empty compact ledger (#13476)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c9c48d26e99d80bd9165e5478edcd1a1a7b8acab
Author: ran <ga...@126.com>
AuthorDate: Mon Dec 27 09:45:38 2021 +0800

    Fix getting last message id from empty compact ledger (#13476)
    
    ### Motivation
    
    Currently, if the last confirmed entry is an empty ledger, getting the last message-id operation will get data from the compact ledger, if the compact ledger is also an empty ledger, it will encounter `IncorrectParameterException`.
    
    **Broker error message**
    ```
    [pulsar-io-29-9] ERROR org.apache.bookkeeper.client.LedgerHandle - IncorrectParameterException on ledgerId:617 firstEntry:-1 lastEntry:-1
    ```
    
    **Client error log**
    ```
    Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription reader-bf9246cfcb of the topic persistent://public/ns-test/t1 gets the last message id was failed
    {"errorMsg":"Failed to read last entry of the compacted Ledger Incorrect parameter input","reqId":79405902881798690, "remote":"localhost/127.0.0.1:6650", "local":"/127.0.0.1:55207"}
    	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1034)
    	at org.apache.pulsar.client.impl.ConsumerImpl.hasMessageAvailable(ConsumerImpl.java:2001)
    	at org.apache.pulsar.client.impl.ReaderImpl.hasMessageAvailable(ReaderImpl.java:181)
    	at org.apache.pulsar.compaction.CompactedTopicTest.main(CompactedTopicTest.java:730)
    ```
    
    ### Modifications
    
    Check the compact ledger entry id before reading an entry from the compact ledger, if there is no entry, return a null value.
    
    (cherry picked from commit 8136762e743fe1bc4be2adeb08631a8c44719d37)
---
 .../pulsar/compaction/CompactedTopicImpl.java      | 15 ++++++---
 .../pulsar/compaction/CompactedTopicTest.java      | 38 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 4577540..a6d6fc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -293,11 +293,16 @@ public class CompactedTopicImpl implements CompactedTopic {
         if (compactionHorizon == null) {
             return CompletableFuture.completedFuture(null);
         }
-        return compactedTopicContext.thenCompose(context ->
-                readEntries(context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
-                        .thenCompose(entries -> entries.size() > 0
-                                ? CompletableFuture.completedFuture(entries.get(0))
-                                : CompletableFuture.completedFuture(null)));
+        return compactedTopicContext.thenCompose(context -> {
+            if (context.ledger.getLastAddConfirmed() == -1) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return readEntries(
+                    context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
+                    .thenCompose(entries -> entries.size() > 0
+                            ? CompletableFuture.completedFuture(entries.get(0))
+                            : CompletableFuture.completedFuture(null));
+        });
     }
 
     private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 69d66d9..d44868a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -650,4 +650,42 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         reader.close();
         producer.close();
     }
+
+    @Test(timeOut = 1000 * 30)
+    public void testReader() throws Exception {
+        final String ns = "my-property/use/my-ns";
+        String topic = "persistent://" + ns + "/t1";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        producer.newMessage().key("k").value(("value").getBytes()).send();
+        producer.newMessage().key("k").value(null).send();
+        pulsar.getCompactor().compact(topic).get();
+
+        Awaitility.await()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            admin.topics().unload(topic);
+            Thread.sleep(100);
+            Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1"));
+        });
+        // Make sure the last confirm entry is -1, then get last message id from compact ledger
+        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic);
+        Assert.assertTrue(internalStats.lastConfirmedEntry.endsWith("-1"));
+        // Because the latest value of the key `k` is null, so there is no data in compact ledger.
+        Assert.assertEquals(internalStats.compactedLedger.size, 0);
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+        Assert.assertFalse(reader.hasMessageAvailable());
+    }
+
 }