You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/20 03:27:57 UTC
[pulsar] branch master updated: Create ManagedCursor with initial
properties (#3857)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f1c893d Create ManagedCursor with initial properties (#3857)
f1c893d is described below
commit f1c893d0a9f796ca848ee08470e0e768fb4381be
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Mar 19 20:27:52 2019 -0700
Create ManagedCursor with initial properties (#3857)
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 37 ++++++++++++++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 21 +++++++++---
.../mledger/impl/ManagedCursorPropertiesTest.java | 25 +++++++++++++++
4 files changed, 84 insertions(+), 9 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d6813e4..00fc2d0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -20,6 +20,9 @@ package org.apache.bookkeeper.mledger;
import com.google.common.annotations.Beta;
import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -157,6 +160,24 @@ public interface ManagedLedger {
public ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
/**
+ * Open a ManagedCursor in this ManagedLedger.
+ * <p>
+ * If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
+ *
+ * @param name
+ * the name associated with the ManagedCursor
+ * @param initialPosition
+ * the cursor will be set at lastest position or not when first created
+ * default is <b>true</b>
+ * @param properties
+ * user defined properties that will be attached to the first position of the cursor, if the open
+ * operation will trigger the creation of the cursor.
+ * @return the ManagedCursor
+ * @throws ManagedLedgerException
+ */
+ public ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException;
+
+ /**
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
* exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
* were deleted. Also it does not prevent data from being deleted.
@@ -229,6 +250,22 @@ public interface ManagedLedger {
public void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);
/**
+ * Open a ManagedCursor asynchronously.
+ *
+ * @see #openCursor(String)
+ * @param name
+ * the name associated with the ManagedCursor
+ * @param initialPosition
+ * the cursor will be set at lastest position or not when first created
+ * default is <b>true</b>
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ public void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, OpenCursorCallback callback, Object ctx);
+
+ /**
* Get a list of all the cursors reading from this ManagedLedger
*
* @return a list of cursors
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f594392..2a5b53c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -274,7 +274,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
- initialize(getRollbackPosition(info), callback);
+ initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -290,7 +290,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
- initialize(getRollbackPosition(info), callback);
+ initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
return;
}
@@ -302,7 +302,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
- initialize(getRollbackPosition(info), callback);
+ initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
@@ -381,8 +381,8 @@ public class ManagedCursorImpl implements ManagedCursor {
STATE_UPDATER.set(this, State.NoLedger);
}
- void initialize(PositionImpl position, final VoidCallback callback) {
- recoveredCursor(position, Collections.emptyMap(), null);
+ void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
+ recoveredCursor(position, properties, null);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fa82bdf..226e230 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -643,9 +643,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return openCursor(cursorName, InitialPosition.Latest);
}
+
@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
throws InterruptedException, ManagedLedgerException {
+ return openCursor(cursorName, initialPosition, Collections.emptyMap());
+ }
+
+ @Override
+ public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
+ throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedCursor cursor = null;
@@ -653,7 +660,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
final Result result = new Result();
- asyncOpenCursor(cursorName, initialPosition, new OpenCursorCallback() {
+ asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
@@ -681,13 +688,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
@Override
- public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx) {
+ public void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx) {
this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx);
}
@Override
- public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
+ public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
final OpenCursorCallback callback, final Object ctx) {
+ this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
+ }
+
+ @Override
+ public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
+ Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
try {
checkManagedLedgerIsOpen();
checkFenced();
@@ -721,7 +734,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
- cursor.initialize(getLastPosition(), new VoidCallback() {
+ cursor.initialize(getLastPosition(), properties, new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 2f2a0a4..75893f0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.testng.annotations.Test;
public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
@@ -136,4 +137,28 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
assertEquals(c1.getProperties(), properties);
}
+ @Test
+ void testPropertiesAtCreation() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger_at_creation", new ManagedLedgerConfig());
+
+
+ Map<String, Long> properties = new TreeMap<>();
+ properties.put("a", 1L);
+ properties.put("b", 2L);
+ properties.put("c", 3L);
+
+ ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties);
+ assertEquals(c1.getProperties(), properties);
+
+ ledger.addEntry("entry-1".getBytes());
+
+ ledger.close();
+
+ // Reopen the managed ledger
+ ledger = factory.open("my_test_ledger_at_creation", new ManagedLedgerConfig());
+ c1 = ledger.openCursor("c1");
+
+ assertEquals(c1.getProperties(), properties);
+ }
+
}