You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/05 12:43:21 UTC

[pulsar] branch branch-2.7 updated: Return message ID from compacted ledger while the compaction cursor reach the end of the topic. (#16395)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f4ebf0844dd Return message ID from compacted ledger while the compaction cursor reach the end of the topic. (#16395)
f4ebf0844dd is described below

commit f4ebf0844dd7fdf9bcd558a58fd7d372e9cbc19a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jul 5 20:43:10 2022 +0800

    Return message ID from compacted ledger while the compaction cursor reach the end of the topic. (#16395)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java    | 5 ++++-
 .../src/main/java/org/apache/pulsar/compaction/CompactedTopic.java   | 2 ++
 .../main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java   | 4 ++++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 206aa6262cd..2223215d0d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1541,7 +1541,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
-        if (position.getEntryId() == -1) {
+        // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
+        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
+        if (position.getEntryId() == -1 || (compactionHorizon.isPresent()
+                && position.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
             handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
                     markDeletePosition);
             return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 7c969373dbe..c91931d071a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -33,4 +34,5 @@ public interface CompactedTopic {
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
     CompletableFuture<Entry> readLastEntryOfCompactedLedger();
+    Optional<Position> getCompactionHorizon();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 5b95f1413ad..ff00bfe9655 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -314,6 +314,10 @@ public class CompactedTopicImpl implements CompactedTopic {
             .compare(p.getLedgerId(), m.getLedgerId())
             .compare(p.getEntryId(), m.getEntryId()).result();
     }
+
+    public synchronized Optional<Position> getCompactionHorizon() {
+        return Optional.ofNullable(this.compactionHorizon);
+    }
     private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
 }