You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/31 08:56:37 UTC
(pulsar) branch branch-3.1 updated: [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b5e4c368480 [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)
b5e4c368480 is described below
commit b5e4c36848016310ad3ec56b6aee6488b540a027
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Jan 20 09:35:56 2024 +0800
[fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)
---
.../broker/admin/impl/PersistentTopicsBase.java | 60 ++++++++++++++-----
.../pulsar/compaction/CompactedTopicImpl.java | 50 ++++++++++++++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 67 ++++++++++++++++++++++
3 files changed, 164 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 758b8048f3d..ec22dc14098 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2882,28 +2882,62 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic is not allowed");
}
- ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
- return ledger.asyncFindPosition(entry -> {
+ final PersistentTopic persistentTopic = (PersistentTopic) topic;
+
+ return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> {
+ if (lastEntry == null) {
+ return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
+ }
+ MessageMetadata metadata;
+ Position position = lastEntry.getPosition();
try {
- long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
- return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
- } catch (Exception e) {
- log.error("[{}] Error deserializing message for message position find", topicName, e);
+ metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer());
} finally {
- entry.release();
+ lastEntry.release();
}
- return false;
- }).thenApply(position -> {
- if (position == null) {
- return null;
+ if (timestamp == metadata.getPublishTime()) {
+ return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(),
+ position.getEntryId(), topicName.getPartitionIndex()));
+ } else if (timestamp < metadata.getPublishTime()) {
+ return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
+ .thenApply(compactedEntry -> {
+ try {
+ return new MessageIdImpl(compactedEntry.getLedgerId(),
+ compactedEntry.getEntryId(), topicName.getPartitionIndex());
+ } finally {
+ compactedEntry.release();
+ }
+ });
} else {
- return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
- topicName.getPartitionIndex());
+ return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
}
});
});
}
+ private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) {
+ return managedLedger.asyncFindPosition(entry -> {
+ try {
+ long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
+ return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
+ } catch (Exception e) {
+ log.error("[{}] Error deserializing message for message position find",
+ topicName,
+ e);
+ } finally {
+ entry.release();
+ }
+ return false;
+ }).thenApply(position -> {
+ if (position == null) {
+ return null;
+ } else {
+ return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
+ topicName.getPartitionIndex());
+ }
+ });
+ }
+
protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
boolean authoritative) {
CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index ab54e96b71f..dfafbc41cb4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -324,6 +325,55 @@ public class CompactedTopicImpl implements CompactedTopic {
});
}
+ CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
+ var compactedTopicContextFuture = this.getCompactedTopicContextFuture();
+
+ if (compactedTopicContextFuture == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
+ LedgerHandle lh = compactedTopicContext.getLedger();
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
+ return promise.thenCompose(index -> {
+ if (index == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
+ });
+ });
+ }
+ private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
+ final long start, final long end,
+ final CompletableFuture<Long> promise,
+ final Long lastMatchIndex,
+ final LedgerHandle lh) {
+ if (start > end) {
+ promise.complete(lastMatchIndex);
+ return;
+ }
+
+ long mid = (start + end) / 2;
+ readEntries(lh, mid, mid).thenAccept(entries -> {
+ Entry entry = entries.get(0);
+ final boolean isMatch;
+ try {
+ isMatch = predicate.test(entry);
+ } finally {
+ entry.release();
+ }
+
+ if (isMatch) {
+ findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh);
+ } else {
+ findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh);
+ }
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+ }
+
private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 10321cb7761..2ac1816a672 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -65,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
@@ -87,6 +90,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1444,6 +1448,69 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
.compareTo(id2) > 0);
}
+ @Test
+ public void testGetMessageIdByTimestampWithCompaction() throws Exception {
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+ admin.tenants().createTenant("tenant-xyz", tenantInfo);
+ admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
+ final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
+ @Cleanup
+ ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .intercept(new ProducerInterceptor() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean eligible(Message message) {
+ return true;
+ }
+
+ @Override
+ public Message beforeSend(Producer producer, Message message) {
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
+ Throwable exception) {
+ publishTimeMap.put(message.getMessageId(), message.getPublishTime());
+ }
+ })
+ .create();
+
+ MessageId id1 = producer.newMessage().key("K1").value("test1".getBytes()).send();
+ MessageId id2 = producer.newMessage().key("K2").value("test2".getBytes()).send();
+
+ long publish1 = publishTimeMap.get(id1);
+ long publish2 = publishTimeMap.get(id2);
+ Assert.assertTrue(publish1 < publish2);
+
+ admin.topics().triggerCompaction(topicName);
+ Awaitility.await().untilAsserted(() ->
+ assertSame(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS));
+
+ admin.topics().unload(topicName);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.ledgers.size(), 1);
+ assertEquals(internalStats.ledgers.get(0).entries, 0);
+ });
+
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
+ Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
+ .compareTo(id2) > 0);
+ }
+
@Test
public void testGetBatchMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));