You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/05 12:43:21 UTC
[pulsar] branch branch-2.7 updated: Return message ID from compacted ledger while the compaction cursor reach the end of the topic. (#16395)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new f4ebf0844dd Return message ID from compacted ledger while the compaction cursor reach the end of the topic. (#16395)
f4ebf0844dd is described below
commit f4ebf0844dd7fdf9bcd558a58fd7d372e9cbc19a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jul 5 20:43:10 2022 +0800
Return message ID from compacted ledger while the compaction cursor reach the end of the topic. (#16395)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 5 ++++-
.../src/main/java/org/apache/pulsar/compaction/CompactedTopic.java | 2 ++
.../main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java | 4 ++++
3 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 206aa6262cd..2223215d0d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1541,7 +1541,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// If it's not pointing to a valid entry, respond messageId of the current position.
- if (position.getEntryId() == -1) {
+ // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
+ Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
+ if (position.getEntryId() == -1 || (compactionHorizon.isPresent()
+ && position.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
markDeletePosition);
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 7c969373dbe..c91931d071a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
@@ -33,4 +34,5 @@ public interface CompactedTopic {
ReadEntriesCallback callback,
Consumer consumer);
CompletableFuture<Entry> readLastEntryOfCompactedLedger();
+ Optional<Position> getCompactionHorizon();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 5b95f1413ad..ff00bfe9655 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -314,6 +314,10 @@ public class CompactedTopicImpl implements CompactedTopic {
.compare(p.getLedgerId(), m.getLedgerId())
.compare(p.getEntryId(), m.getEntryId()).result();
}
+
+ public synchronized Optional<Position> getCompactionHorizon() {
+ return Optional.ofNullable(this.compactionHorizon);
+ }
private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
}