You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/09 21:08:45 UTC
[pulsar] branch master updated: add new method in ManagedCursor and
ReadOnlyCursor to async read entries with max size bytes. (#9532)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 34fdb67 add new method in ManagedCursor and ReadOnlyCursor to async read entries with max size bytes. (#9532)
34fdb67 is described below
commit 34fdb678da4d4008b27eb7d81e8b95707bdfc3ae
Author: ran <ga...@126.com>
AuthorDate: Wed Feb 10 05:08:01 2021 +0800
add new method in ManagedCursor and ReadOnlyCursor to async read entries with max size bytes. (#9532)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 13 ++++++++
.../apache/bookkeeper/mledger/ReadOnlyCursor.java | 12 ++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++++--
.../mledger/impl/ManagedCursorContainerTest.java | 6 ++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 36 ++++++++++++++++++++++
5 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 3f230b6..c9abe05 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -107,6 +107,19 @@ public interface ManagedCursor {
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);
+
+ /**
+ * Asynchronously read entries from the ManagedLedger.
+ *
+ * @param numberOfEntriesToRead maximum number of entries to return
+ * @param maxSizeBytes max size in bytes of the entries to return
+ * @param callback callback object
+ * @param ctx opaque context
+ * @param maxPosition max position can read
+ */
+ void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition);
+
/**
* Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions.
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index 3f97250..202ecda 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -52,6 +52,18 @@ public interface ReadOnlyCursor {
Object ctx, PositionImpl maxPosition);
/**
+ * Asynchronously read entries from the ManagedLedger.
+ *
+ * @param numberOfEntriesToRead maximum number of entries to return
+ * @param maxSizeBytes max size in bytes of the entries to return
+ * @param callback callback object
+ * @param ctx opaque context
+ * @param maxPosition max position can read
+ */
+ void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition);
+
+ /**
* Get the read position. This points to the next message to be read from the cursor.
*
* @return the read position
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a195d7a..4b21fe1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -561,15 +561,22 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
final Object ctx, PositionImpl maxPosition) {
+ asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
+ }
+
+ @Override
+ public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}
+ int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);
+
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
- OpReadEntry op = OpReadEntry.create(this, readPosition,
- numberOfEntriesToRead, callback, ctx, maxPosition);
+ OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition);
ledger.asyncReadEntries(op);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6e4ddc0..612b649 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -82,6 +82,12 @@ public class ManagedCursorContainerTest {
}
@Override
+ public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition) {
+ callback.readEntriesComplete(null, ctx);
+ }
+
+ @Override
public boolean hasMoreEntries() {
return true;
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2afe296..a9ab828 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -464,6 +464,42 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
@Test(timeOut = 20000)
+ void testAsyncReadWithMaxSizeByte() throws Exception {
+ ManagedLedger ledger = factory.open("testAsyncReadWithMaxSizeByte");
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ for (int i = 0; i < 100; i++) {
+ ledger.addEntry(new byte[1024]);
+ }
+
+ // First time, since we don't have info, we'll get 1 single entry
+ readAndCheck(cursor, 10, 3 * 1024, 1);
+ // We should only return 3 entries, based on the max size
+ readAndCheck(cursor, 20, 3 * 1024, 3);
+ // If maxSize is < avg, we should get 1 entry
+ readAndCheck(cursor, 10, 500, 1);
+ }
+
+ private void readAndCheck(ManagedCursor cursor, int numEntriesToRead,
+ long maxSizeBytes, int expectedNumRead) throws InterruptedException {
+ CountDownLatch counter = new CountDownLatch(1);
+ cursor.asyncReadEntries(numEntriesToRead, maxSizeBytes, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ Assert.assertEquals(entries.size(), expectedNumRead);
+ entries.forEach(e -> e.release());
+ counter.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ fail(exception.getMessage());
+ }
+ }, null, null);
+ counter.await();
+ }
+
+ @Test(timeOut = 20000)
void markDeleteWithErrors() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");