You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/03 16:00:35 UTC

[pulsar] branch master updated: Forbid to read other topic's data in managedLedger layer (#11912)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a7bdc5e  Forbid to read other topic's data in managedLedger layer (#11912)
a7bdc5e is described below

commit a7bdc5e25778ada8c781e9b3f9cc4694e5fd3f58
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Sep 3 23:59:52 2021 +0800

    Forbid to read other topic's data in managedLedger layer (#11912)
    
    * forbid to read other topic's data in managedLedger layer
    
    * format code
    
    * update exception type
    
    * fix test
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 +++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 60 +++++++++++++++++++---
 .../broker/admin/impl/PersistentTopicsBase.java    |  7 ---
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 18 +++++--
 4 files changed, 75 insertions(+), 17 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0823886..d5c83ea 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1821,6 +1821,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
         }
+        if (!ledgers.containsKey(position.getLedgerId())) {
+            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+                + "the ledgerId does not belong to this topic or has been deleted"), ctx);
+            return;
+        }
         if (position.getLedgerId() == currentLedger.getId()) {
             asyncReadEntry(currentLedger, position, callback, ctx);
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 2516357..fc14a3e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -68,6 +68,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -118,16 +119,12 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.awaitility.Awaitility;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class ManagedLedgerTest extends MockedBookKeeperTestCase {
-
-    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);
-
     private static final Charset Encoding = Charsets.UTF_8;
 
     @DataProvider(name = "checkOwnershipFlag")
@@ -3030,7 +3027,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             Assert.assertEquals(finalManagedLedger.getTotalSize(), 0);
         });
     }
-  
+
     @Test(timeOut = 20000)
     public void testAsyncTruncateLedgerRetention() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -3204,4 +3201,55 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
         verify(ledgerOffloader, times(1)).getOffloadPolicies();
     }
+
+    @Test(timeOut = 30000)
+    public void testReadOtherManagedLedgersEntry() throws Exception {
+        ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) factory.open("my_test_ledger_a");
+        ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) factory.open("my_test_ledger_b");
+
+        PositionImpl pa = (PositionImpl) managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding));
+        PositionImpl pb = (PositionImpl) managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding));
+
+        // read managedLegerA's entry using managedLedgerA
+        CompletableFuture<byte[]> completableFutureA = new CompletableFuture<>();
+        managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                completableFutureA.complete(entry.getData());
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                completableFutureA.completeExceptionally(exception.getCause());
+            }
+        }, null);
+
+        assertEquals("dummy-entry-a".getBytes(Encoding), completableFutureA.get());
+
+        // read managedLedgerB's entry using managedLedgerA
+        CompletableFuture<byte[]> completableFutureB = new CompletableFuture<>();
+        managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                completableFutureB.complete(entry.getData());
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                completableFutureB.completeExceptionally(exception);
+            }
+        }, null);
+
+        try {
+            completableFutureB.get();
+            Assert.fail();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(),
+                "Message not found, the ledgerId does not belong to this topic or has been deleted");
+        }
+
+        managedLedgerA.close();
+        managedLedgerB.close();
+
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 27aeca7..d29fff6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2300,13 +2300,6 @@ public class PersistentTopicsBase extends AdminResource {
             }
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
-            if (null == ledger.getLedgerInfo(ledgerId).get()) {
-                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
-                                + "the ledgerId does not belong to this topic.",
-                        clientAppId(), ledgerId, entryId, topicName);
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        "Message not found, the ledgerId does not belong to this topic"));
-            }
             ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
                 @Override
                 public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d6ad1ac..182d2cc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -881,11 +881,21 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
         Assert.assertEquals(message2.getData(), data2.getBytes());
 
-        Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
-        Assert.assertNull(message3);
+        Message<byte[]> message3 = null;
+        try {
+            message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertNull(message3);
+        }
 
-        Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
-        Assert.assertNull(message4);
+        Message<byte[]> message4 = null;
+        try {
+            message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertNull(message4);
+        }
     }
 
     @Test