You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/31 08:56:37 UTC

(pulsar) branch branch-3.1 updated: [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b5e4c368480 [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)
b5e4c368480 is described below

commit b5e4c36848016310ad3ec56b6aee6488b540a027
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Jan 20 09:35:56 2024 +0800

    [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 60 ++++++++++++++-----
 .../pulsar/compaction/CompactedTopicImpl.java      | 50 ++++++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 67 ++++++++++++++++++++++
 3 files changed, 164 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 758b8048f3d..ec22dc14098 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2882,28 +2882,62 @@ public class PersistentTopicsBase extends AdminResource {
                     throw new RestException(Status.METHOD_NOT_ALLOWED,
                         "Get message ID by timestamp on a non-persistent topic is not allowed");
                 }
-                ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
-                return ledger.asyncFindPosition(entry -> {
+                final PersistentTopic persistentTopic = (PersistentTopic) topic;
+
+                return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> {
+                    if (lastEntry == null) {
+                        return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
+                    }
+                    MessageMetadata metadata;
+                    Position position = lastEntry.getPosition();
                     try {
-                        long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                        return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
-                    } catch (Exception e) {
-                        log.error("[{}] Error deserializing message for message position find", topicName, e);
+                        metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer());
                     } finally {
-                        entry.release();
+                        lastEntry.release();
                     }
-                    return false;
-                }).thenApply(position -> {
-                    if (position == null) {
-                        return null;
+                    if (timestamp == metadata.getPublishTime()) {
+                        return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(),
+                                position.getEntryId(), topicName.getPartitionIndex()));
+                    } else if (timestamp < metadata.getPublishTime()) {
+                        return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
+                                .thenApply(compactedEntry -> {
+                                    try {
+                                        return new MessageIdImpl(compactedEntry.getLedgerId(),
+                                                compactedEntry.getEntryId(), topicName.getPartitionIndex());
+                                    } finally {
+                                        compactedEntry.release();
+                                    }
+                                });
                     } else {
-                        return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
-                            topicName.getPartitionIndex());
+                        return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
                     }
                 });
             });
     }
 
+    private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) {
+        return managedLedger.asyncFindPosition(entry -> {
+            try {
+                long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
+                return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
+            } catch (Exception e) {
+                log.error("[{}] Error deserializing message for message position find",
+                    topicName,
+                    e);
+            } finally {
+                entry.release();
+            }
+            return false;
+        }).thenApply(position -> {
+            if (position == null) {
+                return null;
+            } else {
+                return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
+                    topicName.getPartitionIndex());
+            }
+        });
+    }
+
     protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
                                                                       boolean authoritative) {
         CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index ab54e96b71f..dfafbc41cb4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -324,6 +325,55 @@ public class CompactedTopicImpl implements CompactedTopic {
         });
     }
 
+    CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
+        var compactedTopicContextFuture = this.getCompactedTopicContextFuture();
+
+        if (compactedTopicContextFuture == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
+            LedgerHandle lh = compactedTopicContext.getLedger();
+            CompletableFuture<Long> promise = new CompletableFuture<>();
+            findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
+            return promise.thenCompose(index -> {
+                if (index == null) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
+            });
+        });
+    }
+    private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
+                                                final long start, final long end,
+                                                final CompletableFuture<Long> promise,
+                                                final Long lastMatchIndex,
+                                                final LedgerHandle lh) {
+        if (start > end) {
+            promise.complete(lastMatchIndex);
+            return;
+        }
+
+        long mid = (start + end) / 2;
+        readEntries(lh, mid, mid).thenAccept(entries -> {
+            Entry entry = entries.get(0);
+            final boolean isMatch;
+            try {
+                isMatch = predicate.test(entry);
+            } finally {
+                entry.release();
+            }
+
+            if (isMatch) {
+                findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh);
+            } else {
+                findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh);
+            }
+        }).exceptionally(ex -> {
+            promise.completeExceptionally(ex);
+            return null;
+        });
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 10321cb7761..2ac1816a672 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
@@ -87,6 +90,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1444,6 +1448,69 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
                 .compareTo(id2) > 0);
     }
 
+    @Test
+    public void testGetMessageIdByTimestampWithCompaction() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
+        final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
+        @Cleanup
+        ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .intercept(new ProducerInterceptor() {
+                    @Override
+                    public void close() {
+
+                    }
+
+                    @Override
+                    public boolean eligible(Message message) {
+                        return true;
+                    }
+
+                    @Override
+                    public Message beforeSend(Producer producer, Message message) {
+                        return message;
+                    }
+
+                    @Override
+                    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
+                                                      Throwable exception) {
+                        publishTimeMap.put(message.getMessageId(), message.getPublishTime());
+                    }
+                })
+                .create();
+
+        MessageId id1 = producer.newMessage().key("K1").value("test1".getBytes()).send();
+        MessageId id2 = producer.newMessage().key("K2").value("test2".getBytes()).send();
+
+        long publish1 = publishTimeMap.get(id1);
+        long publish2 = publishTimeMap.get(id2);
+        Assert.assertTrue(publish1 < publish2);
+
+        admin.topics().triggerCompaction(topicName);
+        Awaitility.await().untilAsserted(() ->
+            assertSame(admin.topics().compactionStatus(topicName).status,
+                LongRunningProcessStatus.Status.SUCCESS));
+
+        admin.topics().unload(topicName);
+        Awaitility.await().untilAsserted(() -> {
+                PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+                assertEquals(internalStats.ledgers.size(), 1);
+                assertEquals(internalStats.ledgers.get(0).entries, 0);
+        });
+
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
+        Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
+                .compareTo(id2) > 0);
+    }
+
     @Test
     public void testGetBatchMessageIdByTimestamp() throws Exception {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));