You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/09/03 01:00:53 UTC

[pulsar] 01/02: [Issue 11814] fix pulsar admin method:getMessageById. (#11852)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f9b02cae83fb1999ce381663e084a4e8de0cd1e8
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Thu Sep 2 09:49:05 2021 +0800

    [Issue 11814] fix pulsar admin method:getMessageById. (#11852)
    
    Fix https://github.com/apache/pulsar/issues/11814 , if we use another topic to find the message, it will return the message, but we may contaminate the ledgers cache in the topic.
    
    **changes**
    Add check in the method 'internalGetMessageById' in PersistentTopicsBase, if the ledgerId not belong to this topic, throw a exception.
    
    (cherry picked from commit 9bfb3dba7c13e8250e1002efdeb39eec56f7e2da)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  7 +++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 32 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f63d318..c7edc77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2298,6 +2298,13 @@ public class PersistentTopicsBase extends AdminResource {
             }
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
+            if (null == ledger.getLedgerInfo(ledgerId).get()) {
+                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
+                                + "the ledgerId does not belong to this topic.",
+                        clientAppId(), ledgerId, entryId, topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        "Message not found, the ledgerId does not belong to this topic"));
+            }
             ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
                 @Override
                 public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index ad37635..dd74400 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -857,4 +858,35 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
     }
+
+    public void testGetMessageById() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
+        final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
+        final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
+        admin.topics().createNonPartitionedTopic(topicName1);
+        admin.topics().createNonPartitionedTopic(topicName2);
+        ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
+                .enableBatching(false).create();
+        String data1 = "test1";
+        MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());
+
+        ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
+                .enableBatching(false).create();
+        String data2 = "test2";
+        MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes());
+
+        Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
+        Assert.assertEquals(message1.getData(), data1.getBytes());
+
+        Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
+        Assert.assertEquals(message2.getData(), data2.getBytes());
+
+        Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
+        Assert.assertNull(message3);
+
+        Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
+        Assert.assertNull(message4);
+    }
 }