You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/20 03:27:57 UTC

[pulsar] branch master updated: Create ManagedCursor with initial properties (#3857)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f1c893d  Create ManagedCursor with initial properties (#3857)
f1c893d is described below

commit f1c893d0a9f796ca848ee08470e0e768fb4381be
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Mar 19 20:27:52 2019 -0700

    Create ManagedCursor with initial properties (#3857)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   | 37 ++++++++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 21 +++++++++---
 .../mledger/impl/ManagedCursorPropertiesTest.java  | 25 +++++++++++++++
 4 files changed, 84 insertions(+), 9 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d6813e4..00fc2d0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -20,6 +20,9 @@ package org.apache.bookkeeper.mledger;
 
 import com.google.common.annotations.Beta;
 import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -157,6 +160,24 @@ public interface ManagedLedger {
     public ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
 
     /**
+     * Open a ManagedCursor in this ManagedLedger.
+     * <p>
+     * If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
+     *
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param initialPosition
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     * @param properties
+     *             user defined properties that will be attached to the first position of the cursor, if the open
+     *             operation will trigger the creation of the cursor.
+     * @return the ManagedCursor
+     * @throws ManagedLedgerException
+     */
+    public ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException;
+
+    /**
      * Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
      * exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
      * were deleted. Also it does not prevent data from being deleted.
@@ -229,6 +250,22 @@ public interface ManagedLedger {
     public void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);
 
     /**
+     * Open a ManagedCursor asynchronously.
+     *
+     * @see #openCursor(String)
+     * @param name
+     *            the name associated with the ManagedCursor
+     * @param initialPosition
+     *            the cursor will be set at lastest position or not when first created
+     *            default is <b>true</b>
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    public void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, OpenCursorCallback callback, Object ctx);
+
+    /**
      * Get a list of all the cursors reading from this ManagedLedger
      *
      * @return a list of cursors
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f594392..2a5b53c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -274,7 +274,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
                         BKException.getMessage(rc));
                 // Rewind to oldest entry available
-                initialize(getRollbackPosition(info), callback);
+                initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
                 return;
             } else if (rc != BKException.Code.OK) {
                 log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -290,7 +290,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
                         ledger.getName(), ledgerId, name);
                 // Rewind to last cursor snapshot available
-                initialize(getRollbackPosition(info), callback);
+                initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
                 return;
             }
 
@@ -302,7 +302,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
                             ledgerId, name, BKException.getMessage(rc1));
                     // Rewind to oldest entry available
-                    initialize(getRollbackPosition(info), callback);
+                    initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
                     return;
                 } else if (rc1 != BKException.Code.OK) {
                     log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
@@ -381,8 +381,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         STATE_UPDATER.set(this, State.NoLedger);
     }
 
-    void initialize(PositionImpl position, final VoidCallback callback) {
-        recoveredCursor(position, Collections.emptyMap(), null);
+    void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
+        recoveredCursor(position, properties, null);
         if (log.isDebugEnabled()) {
             log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
                     ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fa82bdf..226e230 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -643,9 +643,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return openCursor(cursorName, InitialPosition.Latest);
     }
 
+
     @Override
     public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
             throws InterruptedException, ManagedLedgerException {
+        return openCursor(cursorName, initialPosition, Collections.emptyMap());
+    }
+
+    @Override
+    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
+            throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedCursor cursor = null;
@@ -653,7 +660,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
         final Result result = new Result();
 
-        asyncOpenCursor(cursorName, initialPosition, new OpenCursorCallback() {
+        asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 result.cursor = cursor;
@@ -681,13 +688,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx) {
+    public void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx) {
         this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx);
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
+    public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
             final OpenCursorCallback callback, final Object ctx) {
+        this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
+    }
+
+    @Override
+    public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
+            Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -721,7 +734,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName);
         CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
-        cursor.initialize(getLastPosition(), new VoidCallback() {
+        cursor.initialize(getLastPosition(), properties, new VoidCallback() {
             @Override
             public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 2f2a0a4..75893f0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.testng.annotations.Test;
 
 public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
@@ -136,4 +137,28 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getProperties(), properties);
     }
 
+    @Test
+    void testPropertiesAtCreation() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger_at_creation", new ManagedLedgerConfig());
+
+
+        Map<String, Long> properties = new TreeMap<>();
+        properties.put("a", 1L);
+        properties.put("b", 2L);
+        properties.put("c", 3L);
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties);
+        assertEquals(c1.getProperties(), properties);
+
+        ledger.addEntry("entry-1".getBytes());
+
+        ledger.close();
+
+        // Reopen the managed ledger
+        ledger = factory.open("my_test_ledger_at_creation", new ManagedLedgerConfig());
+        c1 = ledger.openCursor("c1");
+
+        assertEquals(c1.getProperties(), properties);
+    }
+
 }