You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/03 07:03:17 UTC
[pulsar] branch master updated: Issue 15750: PIP-105: Store Subscription properties (#15757)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23f46a0736e Issue 15750: PIP-105: Store Subscription properties (#15757)
23f46a0736e is described below
commit 23f46a0736e844a2a2fec943ee76d4e1e73ec477
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 3 09:03:05 2022 +0200
Issue 15750: PIP-105: Store Subscription properties (#15757)
* PIP-105: Store Subscription properties
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 15 +++
.../apache/bookkeeper/mledger/ManagedLedger.java | 9 +-
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 96 ++++++++++++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ++-
managed-ledger/src/main/proto/MLDataFormats.proto | 9 ++
.../mledger/impl/ManagedCursorContainerTest.java | 5 +
.../mledger/impl/ManagedCursorPropertiesTest.java | 62 ++++++++-
.../apache/pulsar/broker/service/Subscription.java | 2 +
.../nonpersistent/NonPersistentSubscription.java | 13 +-
.../service/persistent/PersistentSubscription.java | 21 +++-
.../broker/service/persistent/PersistentTopic.java | 17 +--
.../broker/admin/CreateSubscriptionTest.java | 138 ++++++++++++++-------
.../pulsar/broker/service/PersistentTopicTest.java | 8 +-
.../pulsar/broker/service/ServerCnxTest.java | 10 +-
.../broker/service/plugin/FilterEntryTest.java | 8 +-
.../offload/jcloud/impl/MockManagedLedger.java | 6 +-
16 files changed, 339 insertions(+), 95 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index efd183d0bfb..46ca0f14003 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Range;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -79,6 +80,20 @@ public interface ManagedCursor {
*/
Map<String, Long> getProperties();
+ /**
+ * Return any properties that were associated with the cursor.
+ */
+ Map<String, String> getCursorProperties();
+
+ /**
+ * Updates the properties.
+ * @param cursorProperties
+ * @return a handle to the result of the operation
+ */
+ default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Add a property associated with the last stored position.
*/
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 1f6e0d3af46..7196a3b4c03 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -240,10 +240,13 @@ public interface ManagedLedger {
* @param properties
* user defined properties that will be attached to the first position of the cursor, if the open
* operation will trigger the creation of the cursor.
+ * @param cursorProperties
+ * the properties for the Cursor
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
- ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
+ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
+ Map<String, String> cursorProperties)
throws InterruptedException, ManagedLedgerException;
/**
@@ -337,13 +340,15 @@ public interface ManagedLedger {
* @param initialPosition
* the cursor will be set at lastest position or not when first created
* default is <b>true</b>
+ * @param cursorProperties
+ * the properties for the Cursor
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
- OpenCursorCallback callback, Object ctx);
+ Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx);
/**
* Get a list of all the cursors reading from this ManagedLedger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0c3e46c1acd..3d558a231db 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -91,6 +91,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
@@ -116,6 +117,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
+ private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;
protected volatile PositionImpl markDeletePosition;
@@ -280,6 +282,7 @@ public class ManagedCursorImpl implements ManagedCursor {
ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
this.bookkeeper = bookkeeper;
+ this.cursorProperties = Collections.emptyMap();
this.config = config;
this.ledger = ledger;
this.name = cursorName;
@@ -313,6 +316,52 @@ public class ManagedCursorImpl implements ManagedCursor {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}
+ @Override
+ public Map<String, String> getCursorProperties() {
+ return cursorProperties;
+ }
+
+ @Override
+ public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+ CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
+ ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(ManagedCursorInfo info, Stat stat) {
+ ManagedCursorInfo copy = ManagedCursorInfo
+ .newBuilder(info)
+ .clearCursorProperties()
+ .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
+ .build();
+ ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+ name, copy, stat, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(Void result, Stat stat) {
+ log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
+ name, cursorProperties);
+ ManagedCursorImpl.this.cursorProperties = cursorProperties;
+ cursorLedgerStat = stat;
+ updateCursorPropertiesResult.complete(result);
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
+ name, cursorProperties, e);
+ updateCursorPropertiesResult.completeExceptionally(e);
+ }
+ });
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
+ name, cursorProperties, e);
+ updateCursorPropertiesResult.completeExceptionally(e);
+ }
+ });
+ return updateCursorPropertiesResult;
+ }
+
@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
@@ -361,6 +410,18 @@ public class ManagedCursorImpl implements ManagedCursor {
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
+
+ Map<String, String> recoveredCursorProperties = Collections.emptyMap();
+ if (info.getCursorPropertiesCount() > 0) {
+ // Recover properties map
+ recoveredCursorProperties = Maps.newHashMap();
+ for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
+ StringProperty property = info.getCursorProperties(i);
+ recoveredCursorProperties.put(property.getName(), property.getValue());
+ }
+ }
+ cursorProperties = recoveredCursorProperties;
+
if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
@@ -380,7 +441,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
- recoveredCursor(recoveredPosition, recoveredProperties, null);
+ recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
callback.operationComplete();
} else {
// Need to proceed and read the last entry in the specified ledger to find out the last position
@@ -410,7 +471,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
- initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+ initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -426,7 +487,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
- initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+ initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}
@@ -438,7 +499,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
- initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+ initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
@@ -476,7 +537,7 @@ public class ManagedCursorImpl implements ManagedCursor {
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
- recoveredCursor(position, recoveredProperties, lh);
+ recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
}, null);
};
@@ -547,6 +608,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
+ Map<String, String> cursorProperties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
@@ -564,7 +626,7 @@ public class ManagedCursorImpl implements ManagedCursor {
position = ledger.getLastPosition();
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
-
+ this.cursorProperties = cursorProperties;
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
@@ -577,8 +639,9 @@ public class ManagedCursorImpl implements ManagedCursor {
STATE_UPDATER.set(this, State.NoLedger);
}
- void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
- recoveredCursor(position, properties, null);
+ void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
+ final VoidCallback callback) {
+ recoveredCursor(position, properties, cursorProperties, null);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
@@ -2392,6 +2455,7 @@ public class ManagedCursorImpl implements ManagedCursor {
.setLastActive(lastActive); //
info.addAllProperties(buildPropertiesMap(properties));
+ info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
if (config.isDeletionAtBatchIndexLevelEnabled()) {
@@ -2605,7 +2669,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
- private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
+ private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
}
@@ -2619,6 +2683,20 @@ public class ManagedCursorImpl implements ManagedCursor {
return longProperties;
}
+ private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
+ if (properties == null || properties.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<StringProperty> stringProperties = Lists.newArrayList();
+ properties.forEach((name, value) -> {
+ StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
+ stringProperties.add(sp);
+ });
+
+ return stringProperties;
+ }
+
private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
lock.readLock().lock();
try {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 277e4df0244..f228c32a90e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -845,11 +845,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
throws InterruptedException, ManagedLedgerException {
- return openCursor(cursorName, initialPosition, Collections.emptyMap());
+ return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap());
}
@Override
- public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
+ public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
+ Map<String, String> cursorProperties)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
@@ -858,7 +859,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
final Result result = new Result();
- asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
+ asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
@@ -893,12 +894,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
final OpenCursorCallback callback, final Object ctx) {
- this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
+ this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(),
+ callback, ctx);
}
@Override
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
- Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
+ Map<String, Long> properties, Map<String, String> cursorProperties,
+ final OpenCursorCallback callback, final Object ctx) {
try {
checkManagedLedgerIsOpen();
checkFenced();
@@ -932,7 +935,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
- cursor.initialize(position, properties, new VoidCallback() {
+ cursor.initialize(position, properties, cursorProperties, new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 4671816c1a1..c4e502819fa 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -105,6 +105,11 @@ message LongProperty {
required int64 value = 2;
}
+message StringProperty {
+ required string name = 1;
+ required string value = 2;
+}
+
message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
@@ -123,6 +128,10 @@ message ManagedCursorInfo {
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
+
+ // Additional custom properties associated with
+ // the cursor
+ repeated StringProperty cursorProperties = 8;
}
enum CompressionType {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index a654b30e60b..05f34df47c1 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -67,6 +67,11 @@ public class ManagedCursorContainerTest {
return Collections.emptyMap();
}
+ @Override
+ public Map<String, String> getCursorProperties() {
+ return Collections.emptyMap();
+ }
+
@Override
public boolean putProperty(String key, Long value) {
return false;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 727f3850ad2..74db9d791f3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -75,9 +77,15 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
@Test(timeOut = 20000)
void testPropertiesRecoveryAfterCrash() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
- ManagedCursor c1 = ledger.openCursor("c1");
+
+ Map<String, String> cursorProperties = new TreeMap<>();
+ cursorProperties.put("custom1", "one");
+ cursorProperties.put("custom2", "two");
+
+ ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties);
assertEquals(c1.getProperties(), Collections.emptyMap());
+ assertEquals(c1.getCursorProperties(), cursorProperties);
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
@@ -99,6 +107,7 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getProperties(), properties);
+ assertEquals(c1.getCursorProperties(), cursorProperties);
factory2.shutdown();
}
@@ -148,8 +157,13 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
properties.put("b", 2L);
properties.put("c", 3L);
- ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties);
+ Map<String, String> cursorProperties = new TreeMap<>();
+ cursorProperties.put("custom1", "one");
+ cursorProperties.put("custom2", "two");
+
+ ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
assertEquals(c1.getProperties(), properties);
+ assertEquals(c1.getCursorProperties(), cursorProperties);
ledger.addEntry("entry-1".getBytes());
@@ -160,6 +174,50 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
c1 = ledger.openCursor("c1");
assertEquals(c1.getProperties(), properties);
+ assertEquals(c1.getCursorProperties(), cursorProperties);
}
+ @Test
+ void testUpdateCursorProperties() throws Exception {
+ ManagedLedger ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+
+ Map<String, Long> properties = new TreeMap<>();
+ properties.put("a", 1L);
+
+ Map<String, String> cursorProperties = new TreeMap<>();
+ cursorProperties.put("custom1", "one");
+ cursorProperties.put("custom2", "two");
+
+ ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
+ assertEquals(c1.getProperties(), properties);
+ assertEquals(c1.getCursorProperties(), cursorProperties);
+
+ ledger.addEntry("entry-1".getBytes());
+
+ Map<String, String> cursorPropertiesUpdated = new TreeMap<>();
+ cursorPropertiesUpdated.put("custom1", "three");
+ cursorPropertiesUpdated.put("custom2", "four");
+
+ c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS);
+
+ ledger.close();
+
+ // Reopen the managed ledger
+ ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+ c1 = ledger.openCursor("c1");
+
+ assertEquals(c1.getProperties(), properties);
+ assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
+ // Create a new factory to force a managed ledger close and recovery
+ ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+ // Reopen the managed ledger
+ ledger = factory2.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+ c1 = ledger.openCursor("c1");
+
+ assertEquals(c1.getProperties(), properties);
+ assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
+ factory2.shutdown();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index b1ccb4d1eb0..49b906b7959 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -103,6 +103,8 @@ public interface Subscription {
Map<String, String> getSubscriptionProperties();
+ CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties);
+
default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index ae49b3623ca..a9777f5dd0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -68,7 +68,7 @@ public class NonPersistentSubscription implements Subscription {
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
- private final Map<String, String> subscriptionProperties;
+ private volatile Map<String, String> subscriptionProperties;
// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
private final boolean isDurable;
@@ -526,4 +526,15 @@ public class NonPersistentSubscription implements Subscription {
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
}
+
+ @Override
+ public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
+ if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
+ this.subscriptionProperties = Collections.emptyMap();
+ } else {
+ this.subscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ba2982df627..363224b1134 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -22,7 +22,6 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -114,7 +113,7 @@ public class PersistentSubscription implements Subscription {
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
- private Map<String, String> subscriptionProperties;
+ private volatile Map<String, String> subscriptionProperties;
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
@@ -137,7 +136,7 @@ public class PersistentSubscription implements Subscription {
}
public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
- boolean replicated, Map<String, String> subscriptionProperties) {
+ boolean replicated, Map<String, String> subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
@@ -146,7 +145,7 @@ public class PersistentSubscription implements Subscription {
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
- ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
+ ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(TopicName.get(topicName))) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -1094,6 +1093,20 @@ public class PersistentSubscription implements Subscription {
return subscriptionProperties;
}
+ @Override
+ public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
+ Map<String, String> newSubscriptionProperties;
+ if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
+ newSubscriptionProperties = Collections.emptyMap();
+ } else {
+ newSubscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
+ }
+ return cursor.setCursorProperties(newSubscriptionProperties)
+ .thenRun(() -> {
+ this.subscriptionProperties = newSubscriptionProperties;
+ });
+ }
+
/**
* Return a merged map that contains the cursor properties specified by used
* (eg. when using compaction subscription) and the subscription properties.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c3a62a3a3f7..637cc28847c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -75,7 +75,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -279,7 +278,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
} else {
final String subscriptionName = Codec.decode(cursor.getName());
subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
- PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null));
+ PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+ cursor.getCursorProperties()));
// subscription-cursor gets activated by default: deactivate as there is no active subscription right
// now
subscriptions.get(subscriptionName).deactivateCursor();
@@ -868,7 +868,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
- ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
+ ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
+ new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
if (log.isDebugEnabled()) {
@@ -888,11 +889,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return;
}
}
- if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
- && MapUtils.isNotEmpty(subscriptionProperties)) {
- subscription.getSubscriptionProperties().putAll(subscriptionProperties);
- }
-
if (replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
@@ -971,11 +967,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.failedFuture(
new NotAllowedException("Durable subscription with the same name already exists."));
}
-
- if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
- && MapUtils.isNotEmpty(subscriptionProperties)) {
- subscription.getSubscriptionProperties().putAll(subscriptionProperties);
- }
}
if (startMessageRollbackDurationSec > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index e0f5e0a77a9..12b742a0191 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -213,10 +213,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Map<String, String> properties = subscription.getSubscriptionProperties();
- assertTrue(properties.containsKey("1"));
- assertTrue(properties.containsKey("2"));
- assertEquals(properties.get("1"), "1");
- assertEquals(properties.get("2"), "2");
+ assertEquals(properties, map);
// after updating mark delete position, the properties should still exist
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
@@ -232,10 +229,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
assertEquals(subscription.getCursor().getMarkDeletedPosition().getEntryId(), messageId.getEntryId());
});
properties = subscription.getSubscriptionProperties();
- assertTrue(properties.containsKey("1"));
- assertTrue(properties.containsKey("2"));
- assertEquals(properties.get("1"), "1");
- assertEquals(properties.get("2"), "2");
+ assertEquals(properties, map);
consumer.close();
producer.close();
@@ -249,10 +243,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
.getTopicReference(topic).get().getSubscription(subName);
Awaitility.await().untilAsserted(() -> {
Map<String, String> properties2 = subscription2.getSubscriptionProperties();
- assertTrue(properties2.containsKey("1"));
- assertTrue(properties2.containsKey("2"));
- assertEquals(properties2.get("1"), "1");
- assertEquals(properties2.get("2"), "2");
+ assertEquals(properties2, map);
});
consumer2.close();
@@ -264,13 +255,11 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
.receiverQueueSize(1)
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
Map<String, String> properties3 = subscription.getSubscriptionProperties();
- assertTrue(properties3.containsKey("1"));
- assertTrue(properties3.containsKey("2"));
- assertEquals(properties3.get("1"), "1");
- assertEquals(properties3.get("2"), "2");
+ assertEquals(properties3, map);
consumer3.close();
- //restart and create a new consumer with new properties, the new properties should be updated
+ //restart and create a new consumer with new properties, the new properties must not be updated
+ // for a Durable subscription, but for a NonDurable subscription we pick up the new values
restartBroker();
Consumer<byte[]> consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
.topic(topic).receiverQueueSize(1)
@@ -278,10 +267,12 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Map<String, String> properties4 = subscription4.getSubscriptionProperties();
- assertTrue(properties4.containsKey("3"));
- assertTrue(properties4.containsKey("4"));
- assertEquals(properties4.get("3"), "3");
- assertEquals(properties4.get("4"), "4");
+ if (subscriptionMode == SubscriptionMode.Durable) {
+ assertEquals(properties4, map);
+ } else {
+ assertEquals(properties4, map3);
+
+ }
consumer4.close();
@@ -294,26 +285,28 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
.getTopicReference(topic).get().getSubscription(subName);
properties4 = subscription4.getSubscriptionProperties();
if (subscriptionMode == SubscriptionMode.Durable) {
- assertTrue(properties4.containsKey("3"));
- assertTrue(properties4.containsKey("4"));
- assertEquals(properties4.get("3"), "3");
- assertEquals(properties4.get("4"), "4");
+ assertEquals(properties4, map);
} else {
assertTrue(properties4.isEmpty());
}
consumer4.close();
- //restart broker, it won't get any properties
+ //restart broker, properties for Durable subscription are reloaded from Metadata
restartBroker();
consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
.receiverQueueSize(1)
.subscriptionName(subName).subscribe();
subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
- assertEquals(subscription4.getSubscriptionProperties().size(), 0);
+ properties4 = subscription4.getSubscriptionProperties();
+ if (subscriptionMode == SubscriptionMode.Durable) {
+ assertEquals(properties4, map);
+ } else {
+ assertTrue(properties4.isEmpty());
+ }
consumer4.close();
- //restart broker and create a new consumer with new properties, the properties will be updated
+ //restart broker and create a new consumer with new properties, the properties will not be updated
restartBroker();
consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
.subscriptionMode(subscriptionMode)
@@ -321,16 +314,17 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
.subscriptionName(subName).subscribe();
PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
- properties = subscription5.getSubscriptionProperties();
- assertTrue(properties.containsKey("1"));
- assertTrue(properties.containsKey("2"));
- assertEquals(properties.get("1"), "1");
- assertEquals(properties.get("2"), "2");
- consumer4.close();
+ properties4 = subscription5.getSubscriptionProperties();
+
+ // for the NonDurable subscription here we have the same properties because they
+ // are sent by the Consumer
+ assertEquals(properties4, map);
+ consumer4.close();
String subNameShared = "my-sub-shared";
Map<String, String> mapShared = new HashMap<>();
+ mapShared.put("6", "7");
// open two consumers with a Shared Subscription
Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
.subscriptionMode(subscriptionMode)
@@ -342,26 +336,25 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
properties = subscriptionShared.getSubscriptionProperties();
assertEquals(properties, mapShared);
- // add a new consumer, the properties are updated because they were empty
- mapShared = new HashMap<>();
- mapShared.put("6", "7");
- mapShared.put("8", "9");
+ // add a new consumer, the properties are not updated
+ Map<String, String> mapShared2 = new HashMap<>();
+ mapShared2.put("8", "9");
Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
.subscriptionMode(subscriptionMode)
.subscriptionType(SubscriptionType.Shared)
- .subscriptionProperties(mapShared)
+ .subscriptionProperties(mapShared2)
.subscriptionName(subNameShared).subscribe();
properties = subscriptionShared.getSubscriptionProperties();
assertEquals(properties, mapShared);
- // add a third consumer, the properties are NOT updated because they are not empty
- Map<String, String> mapShared2 = new HashMap<>();
- mapShared2.put("10", "11");
+ // add a third consumer, the properties are NOT updated
+ Map<String, String> mapShared3 = new HashMap<>();
+ mapShared3.put("10", "11");
Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
.subscriptionMode(subscriptionMode)
.subscriptionType(SubscriptionType.Shared)
- .subscriptionProperties(mapShared2)
+ .subscriptionProperties(mapShared3)
.subscriptionName(subNameShared).subscribe();
properties = subscriptionShared.getSubscriptionProperties();
@@ -373,6 +366,65 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
consumerShared3.close();
}
+ @Test
+ public void subscriptionModePersistedTest() throws Exception {
+ String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
+ admin.topics().createNonPartitionedTopic(topic);
+ Map<String, String> map = new HashMap<>();
+ map.put("1", "1");
+ map.put("2", "2");
+ String subName = "my-sub";
+ pulsarClient.newConsumer()
+ .subscriptionMode(SubscriptionMode.Durable)
+ .topic(topic)
+ .subscriptionProperties(map)
+ .subscriptionName(subName)
+ .subscribe()
+ .close();
+ PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+ .getTopicReference(topic).get().getSubscription(subName);
+ Map<String, String> properties = subscription.getSubscriptionProperties();
+ assertTrue(properties.containsKey("1"));
+ assertTrue(properties.containsKey("2"));
+ assertEquals(properties.get("1"), "1");
+ assertEquals(properties.get("2"), "2");
+
+ Map<String, String> subscriptionPropertiesFromAdmin =
+ admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties();
+ assertEquals(map, subscriptionPropertiesFromAdmin);
+
+ // unload the topic
+ admin.topics().unload(topic);
+
+ // verify that the properties are still there
+ subscriptionPropertiesFromAdmin =
+ admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties();
+ assertEquals(map, subscriptionPropertiesFromAdmin);
+
+
+ // create a new subscription, initially properties are empty
+ String subName2 = "my-sub2";
+ admin.topics().createSubscription(topic, subName2, MessageId.latest);
+
+ subscriptionPropertiesFromAdmin =
+ admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties();
+ assertTrue(subscriptionPropertiesFromAdmin.isEmpty());
+
+ // create a consumer, this is not allowed to update the properties
+ pulsarClient.newConsumer()
+ .subscriptionMode(SubscriptionMode.Durable)
+ .topic(topic)
+ .subscriptionProperties(map)
+ .subscriptionName(subName2)
+ .subscribe()
+ .close();
+
+ // verify that the properties are not changed
+ subscriptionPropertiesFromAdmin =
+ admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties();
+ assertTrue(subscriptionPropertiesFromAdmin.isEmpty());
+ }
+
@Test
public void createSubscriptionBySpecifyingStringPosition() throws IOException, PulsarAdminException {
final int numberOfMessages = 5;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 331772b7cdc..a9559ac96b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1561,11 +1561,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+ ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
- any(OpenCursorCallback.class), any());
+ any(Map.class), any(OpenCursorCallback.class), any());
doAnswer(new Answer<Object>() {
@Override
@@ -2214,9 +2214,9 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
return null;
}).when(mockLedger).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
- ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null);
+ ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(mockCursor, null);
return null;
- }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any());
+ }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any());
PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService);
CommandSubscribe cmd = new CommandSubscribe()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e8fa8c60639..cffc82bcf73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1686,20 +1686,20 @@ public class ServerCnxTest {
doAnswer((Answer<Object>) invocationOnMock -> {
Thread.sleep(300);
- ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+ ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
return null;
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
Thread.sleep(300);
- ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+ ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), any(Map.class),
any(OpenCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
Thread.sleep(300);
- ((OpenCursorCallback) invocationOnMock.getArguments()[2])
+ ((OpenCursorCallback) invocationOnMock.getArguments()[3])
.openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
}).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
@@ -1709,7 +1709,7 @@ public class ServerCnxTest {
((OpenCursorCallback) invocationOnMock.getArguments()[3])
.openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
+ }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), any(Map.class),
any(OpenCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 81ad811f43c..b2edbda8855 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -69,10 +69,13 @@ public class FilterEntryTest extends BrokerTestBase {
}
public void testFilter() throws Exception {
-
+ Map<String, String> map = new HashMap<>();
+ map.put("1","1");
+ map.put("2","2");
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
String subName = "sub";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionProperties(map)
.subscriptionName(subName).subscribe();
// mock entry filters
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
@@ -132,9 +135,6 @@ public class FilterEntryTest extends BrokerTestBase {
});
consumer.close();
- Map<String, String> map = new HashMap<>();
- map.put("1","1");
- map.put("2","2");
consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map)
.subscriptionName(subName).subscribe();
for (int i = 0; i < 10; i++) {
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 4ad872919e3..3eaf276c3c5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -105,7 +105,8 @@ public class MockManagedLedger implements ManagedLedger {
@Override
public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition,
- Map<String, Long> properties) throws InterruptedException, ManagedLedgerException {
+ Map<String, Long> properties, Map<String, String> cursorProperties)
+ throws InterruptedException, ManagedLedgerException {
return null;
}
@@ -155,7 +156,8 @@ public class MockManagedLedger implements ManagedLedger {
@Override
public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
- Map<String, Long> properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+ Map<String, Long> properties, Map<String, String> cursorProperties,
+ AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
}