You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/03 07:03:17 UTC

[pulsar] branch master updated: Issue 15750: PIP-105: Store Subscription properties (#15757)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23f46a0736e Issue 15750: PIP-105: Store Subscription properties (#15757)
23f46a0736e is described below

commit 23f46a0736e844a2a2fec943ee76d4e1e73ec477
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 3 09:03:05 2022 +0200

    Issue 15750: PIP-105: Store Subscription properties (#15757)
    
    * PIP-105: Store Subscription properties
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  15 +++
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   9 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  96 ++++++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  15 ++-
 managed-ledger/src/main/proto/MLDataFormats.proto  |   9 ++
 .../mledger/impl/ManagedCursorContainerTest.java   |   5 +
 .../mledger/impl/ManagedCursorPropertiesTest.java  |  62 ++++++++-
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../nonpersistent/NonPersistentSubscription.java   |  13 +-
 .../service/persistent/PersistentSubscription.java |  21 +++-
 .../broker/service/persistent/PersistentTopic.java |  17 +--
 .../broker/admin/CreateSubscriptionTest.java       | 138 ++++++++++++++-------
 .../pulsar/broker/service/PersistentTopicTest.java |   8 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  10 +-
 .../broker/service/plugin/FilterEntryTest.java     |   8 +-
 .../offload/jcloud/impl/MockManagedLedger.java     |   6 +-
 16 files changed, 339 insertions(+), 95 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index efd183d0bfb..46ca0f14003 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Range;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -79,6 +80,20 @@ public interface ManagedCursor {
      */
     Map<String, Long> getProperties();
 
+    /**
+     * Return any properties that were associated with the cursor.
+     */
+    Map<String, String> getCursorProperties();
+
+     /**
+      * Updates the properties.
+      * @param cursorProperties
+      * @return a handle to the result of the operation
+      */
+     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+         return CompletableFuture.completedFuture(null);
+     }
+
     /**
      * Add a property associated with the last stored position.
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 1f6e0d3af46..7196a3b4c03 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -240,10 +240,13 @@ public interface ManagedLedger {
      * @param properties
      *             user defined properties that will be attached to the first position of the cursor, if the open
      *             operation will trigger the creation of the cursor.
+     * @param cursorProperties
+     *            the properties for the Cursor
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
-    ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
+    ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
+                             Map<String, String> cursorProperties)
             throws InterruptedException, ManagedLedgerException;
 
     /**
@@ -337,13 +340,15 @@ public interface ManagedLedger {
      * @param initialPosition
      *            the cursor will be set at lastest position or not when first created
      *            default is <b>true</b>
+     * @param cursorProperties
+     *            the properties for the Cursor
      * @param callback
      *            callback object
      * @param ctx
      *            opaque context
      */
     void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
-                         OpenCursorCallback callback, Object ctx);
+                         Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx);
 
     /**
      * Get a list of all the cursors reading from this ManagedLedger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0c3e46c1acd..3d558a231db 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -91,6 +91,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
@@ -116,6 +117,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     protected final ManagedLedgerConfig config;
     protected final ManagedLedgerImpl ledger;
     private final String name;
+    private volatile Map<String, String> cursorProperties;
     private final BookKeeper.DigestType digestType;
 
     protected volatile PositionImpl markDeletePosition;
@@ -280,6 +282,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
         this.bookkeeper = bookkeeper;
+        this.cursorProperties = Collections.emptyMap();
         this.config = config;
         this.ledger = ledger;
         this.name = cursorName;
@@ -313,6 +316,52 @@ public class ManagedCursorImpl implements ManagedCursor {
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public Map<String, String> getCursorProperties() {
+        return cursorProperties;
+    }
+
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
+        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                ManagedCursorInfo copy = ManagedCursorInfo
+                        .newBuilder(info)
+                        .clearCursorProperties()
+                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
+                        .build();
+                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                        name, copy, stat, new MetaStoreCallback<>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
+                                name, cursorProperties);
+                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        cursorLedgerStat = stat;
+                        updateCursorPropertiesResult.complete(result);
+                    }
+
+                    @Override
+                    public void operationFailed(MetaStoreException e) {
+                        log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
+                                name, cursorProperties, e);
+                        updateCursorPropertiesResult.completeExceptionally(e);
+                    }
+                });
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
+                        name, cursorProperties, e);
+                updateCursorPropertiesResult.completeExceptionally(e);
+            }
+        });
+        return updateCursorPropertiesResult;
+    }
+
     @Override
     public boolean putProperty(String key, Long value) {
         if (lastMarkDeleteEntry != null) {
@@ -361,6 +410,18 @@ public class ManagedCursorImpl implements ManagedCursor {
                 cursorLedgerStat = stat;
                 lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
 
+
+                Map<String, String> recoveredCursorProperties = Collections.emptyMap();
+                if (info.getCursorPropertiesCount() > 0) {
+                    // Recover properties map
+                    recoveredCursorProperties = Maps.newHashMap();
+                    for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
+                        StringProperty property = info.getCursorProperties(i);
+                        recoveredCursorProperties.put(property.getName(), property.getValue());
+                    }
+                }
+                cursorProperties = recoveredCursorProperties;
+
                 if (info.getCursorsLedgerId() == -1L) {
                     // There is no cursor ledger to read the last position from. It means the cursor has been properly
                     // closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
@@ -380,7 +441,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         }
                     }
 
-                    recoveredCursor(recoveredPosition, recoveredProperties, null);
+                    recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
                     callback.operationComplete();
                 } else {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
@@ -410,7 +471,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
                         BKException.getMessage(rc));
                 // Rewind to oldest entry available
-                initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+                initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
                 return;
             } else if (rc != BKException.Code.OK) {
                 log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -426,7 +487,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
                         ledger.getName(), ledgerId, name);
                 // Rewind to last cursor snapshot available
-                initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+                initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
                 return;
             }
 
@@ -438,7 +499,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
                             ledgerId, name, BKException.getMessage(rc1));
                     // Rewind to oldest entry available
-                    initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+                    initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
                     return;
                 } else if (rc1 != BKException.Code.OK) {
                     log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
@@ -476,7 +537,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
                     recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
                 }
-                recoveredCursor(position, recoveredProperties, lh);
+                recoveredCursor(position, recoveredProperties, cursorProperties, lh);
                 callback.operationComplete();
             }, null);
         };
@@ -547,6 +608,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
+                                 Map<String, String> cursorProperties,
                                  LedgerHandle recoveredFromCursorLedger) {
         // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
         // we need to move to the next existing ledger
@@ -564,7 +626,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             position = ledger.getLastPosition();
         }
         log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
-
+        this.cursorProperties = cursorProperties;
         messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
         markDeletePosition = position;
         persistentMarkDeletePosition = position;
@@ -577,8 +639,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         STATE_UPDATER.set(this, State.NoLedger);
     }
 
-    void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
-        recoveredCursor(position, properties, null);
+    void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
+                    final VoidCallback callback) {
+        recoveredCursor(position, properties, cursorProperties, null);
         if (log.isDebugEnabled()) {
             log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
                     ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
@@ -2392,6 +2455,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 .setLastActive(lastActive); //
 
         info.addAllProperties(buildPropertiesMap(properties));
+        info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
         if (persistIndividualDeletedMessageRanges) {
             info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
             if (config.isDeletionAtBatchIndexLevelEnabled()) {
@@ -2605,7 +2669,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
 
-    private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
+    private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
         if (properties.isEmpty()) {
             return Collections.emptyList();
         }
@@ -2619,6 +2683,20 @@ public class ManagedCursorImpl implements ManagedCursor {
         return longProperties;
     }
 
+    private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
+        if (properties == null || properties.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<StringProperty> stringProperties = Lists.newArrayList();
+        properties.forEach((name, value) -> {
+            StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
+            stringProperties.add(sp);
+        });
+
+        return stringProperties;
+    }
+
     private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
         lock.readLock().lock();
         try {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 277e4df0244..f228c32a90e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -845,11 +845,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @Override
     public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
             throws InterruptedException, ManagedLedgerException {
-        return openCursor(cursorName, initialPosition, Collections.emptyMap());
+        return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap());
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
+    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
+                                    Map<String, String> cursorProperties)
             throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
@@ -858,7 +859,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
         final Result result = new Result();
 
-        asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
+        asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 result.cursor = cursor;
@@ -893,12 +894,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @Override
     public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
             final OpenCursorCallback callback, final Object ctx) {
-        this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
+        this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(),
+                callback, ctx);
     }
 
     @Override
     public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
-            Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
+            Map<String, Long> properties, Map<String, String> cursorProperties,
+                                             final OpenCursorCallback callback, final Object ctx) {
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -932,7 +935,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
         PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
-        cursor.initialize(position, properties, new VoidCallback() {
+        cursor.initialize(position, properties, cursorProperties, new VoidCallback() {
             @Override
             public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 4671816c1a1..c4e502819fa 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -105,6 +105,11 @@ message LongProperty {
     required int64 value  = 2;
 }
 
+message StringProperty {
+    required string name = 1;
+    required string value = 2;
+}
+
 message ManagedCursorInfo {
     // If the ledger id is -1, then the mark-delete position is
     // the one from the (ledgerId, entryId) snapshot below
@@ -123,6 +128,10 @@ message ManagedCursorInfo {
 
     // Store which index in the batch message has been deleted
     repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
+
+    // Additional custom properties associated with
+    // the cursor
+    repeated StringProperty cursorProperties = 8;
 }
 
 enum CompressionType {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index a654b30e60b..05f34df47c1 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -67,6 +67,11 @@ public class ManagedCursorContainerTest {
             return Collections.emptyMap();
         }
 
+        @Override
+        public Map<String, String> getCursorProperties() {
+            return Collections.emptyMap();
+        }
+
         @Override
         public boolean putProperty(String key, Long value) {
             return false;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 727f3850ad2..74db9d791f3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -75,9 +77,15 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
     @Test(timeOut = 20000)
     void testPropertiesRecoveryAfterCrash() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
-        ManagedCursor c1 = ledger.openCursor("c1");
+
+        Map<String, String> cursorProperties = new TreeMap<>();
+        cursorProperties.put("custom1", "one");
+        cursorProperties.put("custom2", "two");
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties);
 
         assertEquals(c1.getProperties(), Collections.emptyMap());
+        assertEquals(c1.getCursorProperties(), cursorProperties);
 
         ledger.addEntry("entry-1".getBytes());
         ledger.addEntry("entry-2".getBytes());
@@ -99,6 +107,7 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
 
         assertEquals(c1.getMarkDeletedPosition(), p3);
         assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
 
         factory2.shutdown();
     }
@@ -148,8 +157,13 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         properties.put("b", 2L);
         properties.put("c", 3L);
 
-        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties);
+        Map<String, String> cursorProperties = new TreeMap<>();
+        cursorProperties.put("custom1", "one");
+        cursorProperties.put("custom2", "two");
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
         assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
 
         ledger.addEntry("entry-1".getBytes());
 
@@ -160,6 +174,50 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         c1 = ledger.openCursor("c1");
 
         assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
     }
 
+    @Test
+    void testUpdateCursorProperties() throws Exception {
+        ManagedLedger ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+
+        Map<String, Long> properties = new TreeMap<>();
+        properties.put("a", 1L);
+
+        Map<String, String> cursorProperties = new TreeMap<>();
+        cursorProperties.put("custom1", "one");
+        cursorProperties.put("custom2", "two");
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
+        assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
+
+        ledger.addEntry("entry-1".getBytes());
+
+        Map<String, String> cursorPropertiesUpdated = new TreeMap<>();
+        cursorPropertiesUpdated.put("custom1", "three");
+        cursorPropertiesUpdated.put("custom2", "four");
+
+        c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS);
+
+        ledger.close();
+
+        // Reopen the managed ledger
+        ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+        c1 = ledger.openCursor("c1");
+
+        assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
+        // Create a new factory to force a managed ledger close and recovery
+        ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // Reopen the managed ledger
+        ledger = factory2.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+        c1 = ledger.openCursor("c1");
+
+        assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
+        factory2.shutdown();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index b1ccb4d1eb0..49b906b7959 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -103,6 +103,8 @@ public interface Subscription {
 
     Map<String, String> getSubscriptionProperties();
 
+    CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties);
+
     default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
         // Default is no-op
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index ae49b3623ca..a9777f5dd0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -68,7 +68,7 @@ public class NonPersistentSubscription implements Subscription {
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
-    private final Map<String, String> subscriptionProperties;
+    private volatile Map<String, String> subscriptionProperties;
 
     // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
     private final boolean isDurable;
@@ -526,4 +526,15 @@ public class NonPersistentSubscription implements Subscription {
     public Map<String, String> getSubscriptionProperties() {
         return subscriptionProperties;
     }
+
+    @Override
+    public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
+        if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
+          this.subscriptionProperties = Collections.emptyMap();
+        } else {
+           this.subscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ba2982df627..363224b1134 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -22,7 +22,6 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -114,7 +113,7 @@ public class PersistentSubscription implements Subscription {
 
     private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
     private final PendingAckHandle pendingAckHandle;
-    private Map<String, String> subscriptionProperties;
+    private volatile Map<String, String> subscriptionProperties;
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
@@ -137,7 +136,7 @@ public class PersistentSubscription implements Subscription {
     }
 
     public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
-            boolean replicated, Map<String, String> subscriptionProperties) {
+                                  boolean replicated, Map<String, String> subscriptionProperties) {
         this.topic = topic;
         this.cursor = cursor;
         this.topicName = topic.getName();
@@ -146,7 +145,7 @@ public class PersistentSubscription implements Subscription {
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
-                ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
+                ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(TopicName.get(topicName))) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -1094,6 +1093,20 @@ public class PersistentSubscription implements Subscription {
         return subscriptionProperties;
     }
 
+    @Override
+    public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
+        Map<String, String> newSubscriptionProperties;
+        if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
+            newSubscriptionProperties = Collections.emptyMap();
+        } else {
+            newSubscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
+        }
+        return cursor.setCursorProperties(newSubscriptionProperties)
+                .thenRun(() -> {
+                    this.subscriptionProperties = newSubscriptionProperties;
+                });
+    }
+
     /**
      * Return a merged map that contains the cursor properties specified by used
      * (eg. when using compaction subscription) and the subscription properties.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c3a62a3a3f7..637cc28847c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -75,7 +75,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -279,7 +278,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             } else {
                 final String subscriptionName = Codec.decode(cursor.getName());
                 subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
-                        PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null));
+                        PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+                        cursor.getCursorProperties()));
                 // subscription-cursor gets activated by default: deactivate as there is no active subscription right
                 // now
                 subscriptions.get(subscriptionName).deactivateCursor();
@@ -868,7 +868,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
 
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
+                new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
@@ -888,11 +889,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         return;
                     }
                 }
-                if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
-                        && MapUtils.isNotEmpty(subscriptionProperties)) {
-                    subscription.getSubscriptionProperties().putAll(subscriptionProperties);
-                }
-
                 if (replicated && !subscription.isReplicated()) {
                     // Flip the subscription state
                     subscription.setReplicated(replicated);
@@ -971,11 +967,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Durable subscription with the same name already exists."));
                 }
-
-                if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
-                        && MapUtils.isNotEmpty(subscriptionProperties)) {
-                    subscription.getSubscriptionProperties().putAll(subscriptionProperties);
-                }
             }
 
             if (startMessageRollbackDurationSec > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index e0f5e0a77a9..12b742a0191 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -213,10 +213,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Map<String, String> properties = subscription.getSubscriptionProperties();
-        assertTrue(properties.containsKey("1"));
-        assertTrue(properties.containsKey("2"));
-        assertEquals(properties.get("1"), "1");
-        assertEquals(properties.get("2"), "2");
+        assertEquals(properties, map);
 
         // after updating mark delete position, the properties should still exist
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
@@ -232,10 +229,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
             assertEquals(subscription.getCursor().getMarkDeletedPosition().getEntryId(), messageId.getEntryId());
         });
         properties = subscription.getSubscriptionProperties();
-        assertTrue(properties.containsKey("1"));
-        assertTrue(properties.containsKey("2"));
-        assertEquals(properties.get("1"), "1");
-        assertEquals(properties.get("2"), "2");
+        assertEquals(properties, map);
 
         consumer.close();
         producer.close();
@@ -249,10 +243,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .getTopicReference(topic).get().getSubscription(subName);
         Awaitility.await().untilAsserted(() -> {
             Map<String, String> properties2 = subscription2.getSubscriptionProperties();
-            assertTrue(properties2.containsKey("1"));
-            assertTrue(properties2.containsKey("2"));
-            assertEquals(properties2.get("1"), "1");
-            assertEquals(properties2.get("2"), "2");
+            assertEquals(properties2, map);
         });
         consumer2.close();
 
@@ -264,13 +255,11 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .receiverQueueSize(1)
                 .subscriptionProperties(map3).subscriptionName(subName).subscribe();
         Map<String, String> properties3 = subscription.getSubscriptionProperties();
-        assertTrue(properties3.containsKey("1"));
-        assertTrue(properties3.containsKey("2"));
-        assertEquals(properties3.get("1"), "1");
-        assertEquals(properties3.get("2"), "2");
+        assertEquals(properties3, map);
         consumer3.close();
 
-        //restart and create a new consumer with new properties, the new properties should be updated
+        //restart and create a new consumer with new properties, the new properties must not be updated
+        // for a Durable subscription, but for a NonDurable subscription we pick up the new values
         restartBroker();
         Consumer<byte[]> consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
                 .topic(topic).receiverQueueSize(1)
@@ -278,10 +267,12 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Map<String, String> properties4 = subscription4.getSubscriptionProperties();
-        assertTrue(properties4.containsKey("3"));
-        assertTrue(properties4.containsKey("4"));
-        assertEquals(properties4.get("3"), "3");
-        assertEquals(properties4.get("4"), "4");
+        if (subscriptionMode == SubscriptionMode.Durable) {
+            assertEquals(properties4, map);
+        } else {
+            assertEquals(properties4, map3);
+
+        }
         consumer4.close();
 
 
@@ -294,26 +285,28 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .getTopicReference(topic).get().getSubscription(subName);
         properties4 = subscription4.getSubscriptionProperties();
         if (subscriptionMode == SubscriptionMode.Durable) {
-            assertTrue(properties4.containsKey("3"));
-            assertTrue(properties4.containsKey("4"));
-            assertEquals(properties4.get("3"), "3");
-            assertEquals(properties4.get("4"), "4");
+            assertEquals(properties4, map);
         } else {
             assertTrue(properties4.isEmpty());
         }
         consumer4.close();
 
-        //restart broker, it won't get any properties
+        //restart broker, properties for Durable subscription are reloaded from Metadata
         restartBroker();
         consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
                 .receiverQueueSize(1)
                 .subscriptionName(subName).subscribe();
         subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
-        assertEquals(subscription4.getSubscriptionProperties().size(), 0);
+        properties4 = subscription4.getSubscriptionProperties();
+        if (subscriptionMode == SubscriptionMode.Durable) {
+            assertEquals(properties4, map);
+        } else {
+            assertTrue(properties4.isEmpty());
+        }
         consumer4.close();
 
-        //restart broker and create a new consumer with new properties, the properties will be updated
+        //restart broker and create a new consumer with new properties, the properties will not be updated
         restartBroker();
         consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
@@ -321,16 +314,17 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .subscriptionName(subName).subscribe();
         PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
-        properties = subscription5.getSubscriptionProperties();
-        assertTrue(properties.containsKey("1"));
-        assertTrue(properties.containsKey("2"));
-        assertEquals(properties.get("1"), "1");
-        assertEquals(properties.get("2"), "2");
-        consumer4.close();
+        properties4 = subscription5.getSubscriptionProperties();
+
+        // for the NonDurable subscription here we have the same properties because they
+        // are sent by the Consumer
+        assertEquals(properties4, map);
 
+        consumer4.close();
 
         String subNameShared = "my-sub-shared";
         Map<String, String> mapShared = new HashMap<>();
+        mapShared.put("6", "7");
         // open two consumers with a Shared Subscription
         Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
@@ -342,26 +336,25 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         properties = subscriptionShared.getSubscriptionProperties();
         assertEquals(properties, mapShared);
 
-        // add a new consumer, the properties are updated because they were empty
-        mapShared = new HashMap<>();
-        mapShared.put("6", "7");
-        mapShared.put("8", "9");
+        // add a new consumer, the properties are not updated
+        Map<String, String> mapShared2 = new HashMap<>();
+        mapShared2.put("8", "9");
         Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionProperties(mapShared)
+                .subscriptionProperties(mapShared2)
                 .subscriptionName(subNameShared).subscribe();
 
         properties = subscriptionShared.getSubscriptionProperties();
         assertEquals(properties, mapShared);
 
-        // add a third consumer, the properties are NOT updated because they are not empty
-        Map<String, String> mapShared2 = new HashMap<>();
-        mapShared2.put("10", "11");
+        // add a third consumer, the properties are NOT updated
+        Map<String, String> mapShared3 = new HashMap<>();
+        mapShared3.put("10", "11");
         Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionProperties(mapShared2)
+                .subscriptionProperties(mapShared3)
                 .subscriptionName(subNameShared).subscribe();
 
         properties = subscriptionShared.getSubscriptionProperties();
@@ -373,6 +366,65 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         consumerShared3.close();
     }
 
+    @Test
+    public void subscriptionModePersistedTest() throws Exception {
+        String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Map<String, String> map = new HashMap<>();
+        map.put("1", "1");
+        map.put("2", "2");
+        String subName = "my-sub";
+        pulsarClient.newConsumer()
+                .subscriptionMode(SubscriptionMode.Durable)
+                .topic(topic)
+                .subscriptionProperties(map)
+                .subscriptionName(subName)
+                .subscribe()
+                .close();
+        PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Map<String, String> properties = subscription.getSubscriptionProperties();
+        assertTrue(properties.containsKey("1"));
+        assertTrue(properties.containsKey("2"));
+        assertEquals(properties.get("1"), "1");
+        assertEquals(properties.get("2"), "2");
+
+        Map<String, String> subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties();
+        assertEquals(map, subscriptionPropertiesFromAdmin);
+
+        // unload the topic
+        admin.topics().unload(topic);
+
+        // verify that the properties are still there
+        subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties();
+        assertEquals(map, subscriptionPropertiesFromAdmin);
+
+
+        // create a new subscription, initially properties are empty
+        String subName2 = "my-sub2";
+        admin.topics().createSubscription(topic, subName2, MessageId.latest);
+
+        subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties();
+        assertTrue(subscriptionPropertiesFromAdmin.isEmpty());
+
+        // create a consumer, this is not allowed to update the properties
+        pulsarClient.newConsumer()
+                .subscriptionMode(SubscriptionMode.Durable)
+                .topic(topic)
+                .subscriptionProperties(map)
+                .subscriptionName(subName2)
+                .subscribe()
+                .close();
+
+        // verify that the properties are not changed
+        subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties();
+        assertTrue(subscriptionPropertiesFromAdmin.isEmpty());
+    }
+
     @Test
     public void createSubscriptionBySpecifyingStringPosition() throws IOException, PulsarAdminException {
         final int numberOfMessages = 5;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 331772b7cdc..a9559ac96b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1561,11 +1561,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
                 return null;
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
-                any(OpenCursorCallback.class), any());
+                any(Map.class), any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -2214,9 +2214,9 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
             return null;
         }).when(mockLedger).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
         doAnswer((Answer<Object>) invocationOnMock -> {
-            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null);
+            ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(mockCursor, null);
             return null;
-        }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any());
+        }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any());
         PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService);
 
         CommandSubscribe cmd = new CommandSubscribe()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e8fa8c60639..cffc82bcf73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1686,20 +1686,20 @@ public class ServerCnxTest {
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
-            ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
             return null;
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
-            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+            ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), any(Map.class),
                 any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
-            ((OpenCursorCallback) invocationOnMock.getArguments()[2])
+            ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                     .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
             return null;
         }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
@@ -1709,7 +1709,7 @@ public class ServerCnxTest {
             ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                     .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
+        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), any(Map.class),
                 any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 81ad811f43c..b2edbda8855 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -69,10 +69,13 @@ public class FilterEntryTest extends BrokerTestBase {
     }
 
     public void testFilter() throws Exception {
-
+        Map<String, String> map = new HashMap<>();
+        map.put("1","1");
+        map.put("2","2");
         String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
         String subName = "sub";
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionProperties(map)
                 .subscriptionName(subName).subscribe();
         // mock entry filters
         PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
@@ -132,9 +135,6 @@ public class FilterEntryTest extends BrokerTestBase {
         });
         consumer.close();
 
-        Map<String, String> map = new HashMap<>();
-        map.put("1","1");
-        map.put("2","2");
         consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map)
                 .subscriptionName(subName).subscribe();
         for (int i = 0; i < 10; i++) {
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 4ad872919e3..3eaf276c3c5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -105,7 +105,8 @@ public class MockManagedLedger implements ManagedLedger {
 
     @Override
     public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition,
-                                    Map<String, Long> properties) throws InterruptedException, ManagedLedgerException {
+                                    Map<String, Long> properties, Map<String, String> cursorProperties)
+            throws InterruptedException, ManagedLedgerException {
         return null;
     }
 
@@ -155,7 +156,8 @@ public class MockManagedLedger implements ManagedLedger {
 
     @Override
     public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
-                                Map<String, Long> properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+                                Map<String, Long> properties, Map<String, String> cursorProperties,
+                                AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
 
     }