You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/09/03 01:00:53 UTC
[pulsar] 01/02: [Issue 11814] fix pulsar admin
method:getMessageById. (#11852)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f9b02cae83fb1999ce381663e084a4e8de0cd1e8
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Thu Sep 2 09:49:05 2021 +0800
[Issue 11814] fix pulsar admin method:getMessageById. (#11852)
Fix https://github.com/apache/pulsar/issues/11814 , if we use another topic to find the message, it will return the message, but we may contaminate the ledgers cache in the topic.
**changes**
Add check in the method 'internalGetMessageById' in PersistentTopicsBase, if the ledgerId not belong to this topic, throw a exception.
(cherry picked from commit 9bfb3dba7c13e8250e1002efdeb39eec56f7e2da)
---
.../broker/admin/impl/PersistentTopicsBase.java | 7 +++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 32 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f63d318..c7edc77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2298,6 +2298,13 @@ public class PersistentTopicsBase extends AdminResource {
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
+ if (null == ledger.getLedgerInfo(ledgerId).get()) {
+ log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
+ + "the ledgerId does not belong to this topic.",
+ clientAppId(), ledgerId, entryId, topicName);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Message not found, the ledgerId does not belong to this topic"));
+ }
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index ad37635..dd74400 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -857,4 +858,35 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}
+
+ public void testGetMessageById() throws Exception {
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("tenant-xyz", tenantInfo);
+ admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
+ final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
+ final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
+ admin.topics().createNonPartitionedTopic(topicName1);
+ admin.topics().createNonPartitionedTopic(topicName2);
+ ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
+ .enableBatching(false).create();
+ String data1 = "test1";
+ MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());
+
+ ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
+ .enableBatching(false).create();
+ String data2 = "test2";
+ MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes());
+
+ Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
+ Assert.assertEquals(message1.getData(), data1.getBytes());
+
+ Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
+ Assert.assertEquals(message2.getData(), data2.getBytes());
+
+ Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
+ Assert.assertNull(message3);
+
+ Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
+ Assert.assertNull(message4);
+ }
}