You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/03 16:00:35 UTC
[pulsar] branch master updated: Forbid to read other topic's data
in managedLedger layer (#11912)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a7bdc5e Forbid to read other topic's data in managedLedger layer (#11912)
a7bdc5e is described below
commit a7bdc5e25778ada8c781e9b3f9cc4694e5fd3f58
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Sep 3 23:59:52 2021 +0800
Forbid to read other topic's data in managedLedger layer (#11912)
* forbid to read other topic's data in managedLedger layer
* format code
* update exception type
* fix test
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 60 +++++++++++++++++++---
.../broker/admin/impl/PersistentTopicsBase.java | 7 ---
.../pulsar/broker/admin/PersistentTopicsTest.java | 18 +++++--
4 files changed, 75 insertions(+), 17 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0823886..d5c83ea 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1821,6 +1821,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
}
+ if (!ledgers.containsKey(position.getLedgerId())) {
+ log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+ callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ + "the ledgerId does not belong to this topic or has been deleted"), ctx);
+ return;
+ }
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
} else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 2516357..fc14a3e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -68,6 +68,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -118,16 +119,12 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
public class ManagedLedgerTest extends MockedBookKeeperTestCase {
-
- private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);
-
private static final Charset Encoding = Charsets.UTF_8;
@DataProvider(name = "checkOwnershipFlag")
@@ -3030,7 +3027,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Assert.assertEquals(finalManagedLedger.getTotalSize(), 0);
});
}
-
+
@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -3204,4 +3201,55 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}
+
+ @Test(timeOut = 30000)
+ public void testReadOtherManagedLedgersEntry() throws Exception {
+ ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) factory.open("my_test_ledger_a");
+ ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) factory.open("my_test_ledger_b");
+
+ PositionImpl pa = (PositionImpl) managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding));
+ PositionImpl pb = (PositionImpl) managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding));
+
+ // read managedLegerA's entry using managedLedgerA
+ CompletableFuture<byte[]> completableFutureA = new CompletableFuture<>();
+ managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ completableFutureA.complete(entry.getData());
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ completableFutureA.completeExceptionally(exception.getCause());
+ }
+ }, null);
+
+ assertEquals("dummy-entry-a".getBytes(Encoding), completableFutureA.get());
+
+ // read managedLedgerB's entry using managedLedgerA
+ CompletableFuture<byte[]> completableFutureB = new CompletableFuture<>();
+ managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ completableFutureB.complete(entry.getData());
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ completableFutureB.completeExceptionally(exception);
+ }
+ }, null);
+
+ try {
+ completableFutureB.get();
+ Assert.fail();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(),
+ "Message not found, the ledgerId does not belong to this topic or has been deleted");
+ }
+
+ managedLedgerA.close();
+ managedLedgerB.close();
+
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 27aeca7..d29fff6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2300,13 +2300,6 @@ public class PersistentTopicsBase extends AdminResource {
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
- if (null == ledger.getLedgerInfo(ledgerId).get()) {
- log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
- + "the ledgerId does not belong to this topic.",
- clientAppId(), ledgerId, entryId, topicName);
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Message not found, the ledgerId does not belong to this topic"));
- }
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d6ad1ac..182d2cc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -881,11 +881,21 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
Assert.assertEquals(message2.getData(), data2.getBytes());
- Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
- Assert.assertNull(message3);
+ Message<byte[]> message3 = null;
+ try {
+ message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertNull(message3);
+ }
- Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
- Assert.assertNull(message4);
+ Message<byte[]> message4 = null;
+ try {
+ message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertNull(message4);
+ }
}
@Test