You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/09 21:08:45 UTC

[pulsar] branch master updated: add new method in ManagedCursor and ReadOnlyCursor to async read entries with max size bytes. (#9532)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 34fdb67  add new method in ManagedCursor and ReadOnlyCursor to async read entries with max size bytes. (#9532)
34fdb67 is described below

commit 34fdb678da4d4008b27eb7d81e8b95707bdfc3ae
Author: ran <ga...@126.com>
AuthorDate: Wed Feb 10 05:08:01 2021 +0800

    add new method in ManagedCursor and ReadOnlyCursor to async read entries with max size bytes. (#9532)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 13 ++++++++
 .../apache/bookkeeper/mledger/ReadOnlyCursor.java  | 12 ++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++++--
 .../mledger/impl/ManagedCursorContainerTest.java   |  6 ++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 36 ++++++++++++++++++++++
 5 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 3f230b6..c9abe05 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -107,6 +107,19 @@ public interface ManagedCursor {
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
                           PositionImpl maxPosition);
 
+
+    /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     * @param maxPosition           max position can read
+     */
+    void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+                          Object ctx, PositionImpl maxPosition);
+
     /**
      * Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index 3f97250..202ecda 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -52,6 +52,18 @@ public interface ReadOnlyCursor {
                           Object ctx, PositionImpl maxPosition);
 
     /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     * @param maxPosition           max position can read
+     */
+    void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+                          Object ctx, PositionImpl maxPosition);
+
+    /**
      * Get the read position. This points to the next message to be read from the cursor.
      *
      * @return the read position
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a195d7a..4b21fe1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -561,15 +561,22 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
             final Object ctx, PositionImpl maxPosition) {
+        asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
+    }
+
+    @Override
+    public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+                                 Object ctx, PositionImpl maxPosition) {
         checkArgument(numberOfEntriesToRead > 0);
         if (isClosed()) {
             callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
             return;
         }
 
+        int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);
+
         PENDING_READ_OPS_UPDATER.incrementAndGet(this);
-        OpReadEntry op = OpReadEntry.create(this, readPosition,
-                numberOfEntriesToRead, callback, ctx, maxPosition);
+        OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition);
         ledger.asyncReadEntries(op);
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6e4ddc0..612b649 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -82,6 +82,12 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
+                                     Object ctx, PositionImpl maxPosition) {
+            callback.readEntriesComplete(null, ctx);
+        }
+
+        @Override
         public boolean hasMoreEntries() {
             return true;
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2afe296..a9ab828 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -464,6 +464,42 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testAsyncReadWithMaxSizeByte() throws Exception {
+        ManagedLedger ledger = factory.open("testAsyncReadWithMaxSizeByte");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        for (int i = 0; i < 100; i++) {
+            ledger.addEntry(new byte[1024]);
+        }
+
+        // First time, since we don't have info, we'll get 1 single entry
+        readAndCheck(cursor, 10, 3 * 1024, 1);
+        // We should only return 3 entries, based on the max size
+        readAndCheck(cursor, 20, 3 * 1024, 3);
+        // If maxSize is < avg, we should get 1 entry
+        readAndCheck(cursor, 10, 500, 1);
+    }
+
+    private void readAndCheck(ManagedCursor cursor, int numEntriesToRead,
+                              long maxSizeBytes, int expectedNumRead) throws InterruptedException {
+        CountDownLatch counter = new CountDownLatch(1);
+        cursor.asyncReadEntries(numEntriesToRead, maxSizeBytes, new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                Assert.assertEquals(entries.size(), expectedNumRead);
+                entries.forEach(e -> e.release());
+                counter.countDown();
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                fail(exception.getMessage());
+            }
+        }, null, null);
+        counter.await();
+    }
+
+    @Test(timeOut = 20000)
     void markDeleteWithErrors() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
         ManagedCursor cursor = ledger.openCursor("c1");