You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/03 15:26:31 UTC

[pulsar] branch branch-2.10 updated (70c77948c60 -> 9dd4057d5bc)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 70c77948c60 [PIP-146] ManagedCursorInfo compression (#14542)
     new 5d59e70f6fb [fix][ci] Fix tests memory leak due to mockito-inline (#15513)
     new b46924c3a6e Issue 15750: PIP-105: Store Subscription properties (#15757)
     new 9dd4057d5bc PIP-105 add support for updating the Subscription properties (#15751)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 buildtools/pom.xml                                 |  11 ++
 .../pulsar/tests/MockitoCleanupListener.java       |  19 ++-
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  15 +++
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   9 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  97 +++++++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  15 ++-
 managed-ledger/src/main/proto/MLDataFormats.proto  |   9 ++
 .../mledger/impl/ManagedCursorContainerTest.java   |   5 +
 .../mledger/impl/ManagedCursorPropertiesTest.java  |  62 ++++++++-
 .../broker/admin/impl/PersistentTopicsBase.java    | 104 ++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  37 ++++++
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../nonpersistent/NonPersistentSubscription.java   |  13 +-
 .../service/persistent/PersistentSubscription.java |  21 +++-
 .../broker/service/persistent/PersistentTopic.java |  17 +--
 .../broker/admin/AdminApiSubscriptionTest.java     |  45 +++++++
 .../broker/admin/CreateSubscriptionTest.java       | 138 ++++++++++++++-------
 .../pulsar/broker/service/PersistentTopicTest.java |   8 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  10 +-
 .../broker/service/plugin/FilterEntryTest.java     |   8 +-
 .../org/apache/pulsar/client/admin/Topics.java     |  21 ++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  19 +++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  16 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  36 ++++++
 .../pulsar/tests/integration/cli/CLITest.java      |  25 +++-
 .../offload/jcloud/impl/MockManagedLedger.java     |   6 +-
 26 files changed, 669 insertions(+), 99 deletions(-)


[pulsar] 03/03: PIP-105 add support for updating the Subscription properties (#15751)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9dd4057d5bccdc93d29cab49cdb969f507a56739
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 3 13:44:16 2022 +0200

    PIP-105 add support for updating the Subscription properties (#15751)
    
    * PIP-105 add support for updating the Subscription properties
    
    * Implement command update-subscription-properties
    
    * Add tests
    
    * Add volatile
    
    * Fix PersistentTopicTest
    
    * PIP-105: Store Subscription properties
    
    * Fix FilterEntryTest
    
    * Add volatile
    
    * Fix PersistentTopicTest
    
    * fix ServerCnxTest test
    
    * Switch from POST to PUT
    
    * rename to /properties
    
    * Apply suggestions from code review
    
    Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
    
    Co-authored-by: Lari Hotari <lh...@users.noreply.github.com>
    (cherry picked from commit 8e77e88cc54c05fa4f8b360be499c3da61607a66)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   5 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 104 +++++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  37 ++++++++
 .../broker/admin/AdminApiSubscriptionTest.java     |  45 +++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  21 +++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  19 ++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  16 ++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  36 +++++++
 .../pulsar/tests/integration/cli/CLITest.java      |  25 ++++-
 9 files changed, 305 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1092ca0a89d..424874a8dfa 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -324,7 +325,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
+        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
                 ManagedCursorInfo copy = ManagedCursorInfo
@@ -333,7 +334,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
                         .build();
                 ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+                        name, copy, stat, new MetaStoreCallback<Void>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4228ac6254d..f118172045d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1556,6 +1556,32 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
+    private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                      String subName, Map<String, String> subscriptionProperties,
+                                      boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> {
+                    Topic topic = getTopicReference(topicName);
+                    Subscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        throw new RestException(Status.NOT_FOUND, "Subscription not found");
+                    }
+                    return sub.updateSubscriptionProperties(subscriptionProperties);
+                }).thenRun(() -> {
+                    log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
+                    }
+                    asyncResponse.resume(new RestException(cause));
+                    return null;
+                });
+    }
+
     protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
                                                         String subName, boolean authoritative) {
         CompletableFuture<Void> future;
@@ -2276,6 +2302,84 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
+    protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName,
+                                                        Map<String, String> subscriptionProperties,
+                                                        boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (topicName.isPartitioned()) {
+                internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+                        subscriptionProperties, authoritative);
+            } else {
+                getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenAcceptAsync(partitionMetadata -> {
+                    if (partitionMetadata.partitions > 0) {
+                        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                        for (int i = 0; i < partitionMetadata.partitions; i++) {
+                            TopicName topicNamePartition = topicName.getPartition(i);
+                            try {
+                                futures.add(pulsar().getAdminClient().topics()
+                                        .updateSubscriptionPropertiesAsync(topicNamePartition.toString(),
+                                                subName, subscriptionProperties));
+                            } catch (Exception e) {
+                                log.error("[{}] Failed to update properties for subscription {} {}",
+                                        clientAppId(), topicNamePartition, subName,
+                                        e);
+                                asyncResponse.resume(new RestException(e));
+                                return;
+                            }
+                        }
+
+                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                            if (exception != null) {
+                                Throwable t = exception.getCause();
+                                if (t instanceof NotFoundException) {
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                    return null;
+                                } else if (t instanceof PreconditionFailedException) {
+                                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                            "Subscription has active connected consumers"));
+                                    return null;
+                                } else {
+                                    log.error("[{}] Failed to update properties for subscription {} {}",
+                                            clientAppId(), topicName, subName, t);
+                                    asyncResponse.resume(new RestException(t));
+                                    return null;
+                                }
+                            }
+
+                            asyncResponse.resume(Response.noContent().build());
+                            return null;
+                        });
+                    } else {
+                        internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
+                                subscriptionProperties, authoritative);
+                    }
+                }, pulsar().getExecutor()).exceptionally(ex -> {
+                    log.error("[{}] Failed to update properties for subscription {} from topic {}",
+                            clientAppId(), subName, topicName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+            }
+        }).exceptionally(ex -> {
+            // If the exception is not redirect exception we need to log it.
+            if (!isRedirectException(ex)) {
+                log.error("[{}] Failed to update subscription {} from topic {}",
+                        clientAppId(), subName, topicName, ex);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
+    }
+
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
         CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fd5cb75b10b..754f54ca515 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1464,6 +1464,43 @@ public class PersistentTopics extends PersistentTopicsBase {
         }
     }
 
+    @PUT
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
+    @ApiOperation(value = "Replaces all the properties on the given subscription")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void updateSubscriptionProperties(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to update", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "The new properties") Map<String, String> subscriptionProperties,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalUpdateSubscriptionProperties(asyncResponse, decode(encodedSubName),
+                    subscriptionProperties, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
     @ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 8d25c61e1dd..f7af28ddf41 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -174,5 +174,50 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
             assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
         }
 
+        // clear the properties on subscriptionName
+        admin.topics().updateSubscriptionProperties(topic, subscriptionName, new HashMap<>());
+
+        if (partitioned) {
+            PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+                SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+                        .getSubscriptions().get(subscriptionName);
+                assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+            }
+
+            // aggregated properties
+            SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+                    .getSubscriptions().get(subscriptionName);
+            assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+
+        } else {
+            SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+            assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
+        }
+
+        // update the properties on subscriptionName
+        admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);
+
+        if (partitioned) {
+            PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
+            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
+                SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
+                        .getSubscriptions().get(subscriptionName);
+                assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+            }
+
+            // aggregated properties
+            SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
+                    .getSubscriptions().get(subscriptionName);
+            assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+        } else {
+            SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+            assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
+
+            SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
+            assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
+        }
+
     }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 7b533030961..eab3b8041f3 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1737,6 +1737,17 @@ public interface Topics {
      */
     void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException;
 
+    /**
+     * Update Subscription Properties on a topic subscription.
+     * The new properties will override the existing values, properties that are not passed will be removed.
+     * @param topic
+     * @param subName
+     * @param subscriptionProperties
+     * @throws PulsarAdminException
+     */
+    void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
+            throws PulsarAdminException;
+
     /**
      * Reset cursor position on a topic subscription.
      *
@@ -1761,6 +1772,16 @@ public interface Topics {
      */
     CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded);
 
+    /**
+     * Update Subscription Properties on a topic subscription.
+     * The new properties will override the existing values, properties that are not passed will be removed.
+     * @param topic
+     * @param subName
+     * @param subscriptionProperties
+     */
+    CompletableFuture<Void>  updateSubscriptionPropertiesAsync(String topic, String subName,
+                                                               Map<String, String> subscriptionProperties);
+
     /**
      * Reset cursor position on a topic subscription.
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 1667cc4f90a..a5f5edae290 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1147,6 +1147,25 @@ public class TopicsImpl extends BaseResource implements Topics {
         sync(() -> resetCursorAsync(topic, subName, messageId));
     }
 
+    @Override
+    public void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
+            throws PulsarAdminException {
+        sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
+    }
+
+    @Override
+    public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
+                                                                     Map<String, String> subscriptionProperties) {
+        TopicName tn = validateTopic(topic);
+        String encodedSubName = Codec.encode(subName);
+        WebTarget path = topicPath(tn, "subscription", encodedSubName,
+                "properties");
+        if (subscriptionProperties == null) {
+            subscriptionProperties = new HashMap<>();
+        }
+        return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public void resetCursor(String topic, String subName, MessageId messageId
             , boolean isExcluded) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 23ad1a04f84..ca129e54971 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1418,6 +1418,22 @@ public class PulsarAdminToolTest {
         props.put("a", "b");
         verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
 
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
+        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);
+
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
+        verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
+
+        cmdTopics = new CmdTopics(() -> admin);
+        props = new HashMap<>();
+        props.put("a", "b");
+        props.put("c", "d");
+        cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d"));
+        verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props);
+
+        cmdTopics = new CmdTopics(() -> admin);
         cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 883617f89b9..7c3736616e9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -98,6 +98,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("subscriptions", new ListSubscriptions());
         jcommander.addCommand("unsubscribe", new DeleteSubscription());
         jcommander.addCommand("create-subscription", new CreateSubscription());
+        jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties());
 
         jcommander.addCommand("stats", new GetStats());
         jcommander.addCommand("stats-internal", new GetInternalStats());
@@ -919,6 +920,41 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Update the properties of a subscription on a topic")
+    private class UpdateSubscriptionProperties extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-s",
+                "--subscription" }, description = "Subscription to update", required = true)
+        private String subscriptionName;
+
+        @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
+                required = false)
+        private java.util.List<String> properties;
+
+        @Parameter(names = {"--clear", "-c"}, description = "Remove all properties",
+                required = false)
+        private boolean clear;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            Map<String, String> map = parseListKeyValueMap(properties);
+            if (map == null) {
+                map = Collections.emptyMap();
+            }
+            if ((map.isEmpty()) && !clear) {
+                throw new ParameterException("If you want to clear the properties you have to use --clear");
+            }
+            if (clear && !map.isEmpty()) {
+                throw new ParameterException("If you set --clear then you should not pass any properties");
+            }
+            getTopics().updateSubscriptionProperties(topic, subscriptionName, map);
+        }
+    }
+
+
     @Parameters(commandDescription = "Reset position for subscription to a position that is closest to "
             + "timestamp or messageId.")
     private class ResetCursor extends CliCommand {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index b0da409e0fe..a1e417ca547 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -174,7 +174,7 @@ public class CLITest extends PulsarTestSuite {
     }
 
     @Test
-    public void testCreateSubscriptionWithPropertiesCommand() throws Exception {
+    public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception {
         String topic = "testCreateSubscriptionCommmand";
 
         String subscriptionPrefix = "subscription-";
@@ -194,6 +194,29 @@ public class CLITest extends PulsarTestSuite {
                     "" + subscriptionPrefix + i
             );
             result.assertNoOutput();
+
+            ContainerExecResult resultUpdate = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "update-subscription-properties",
+                    "-p",
+                    "a=e",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            resultUpdate.assertNoOutput();
+
+            ContainerExecResult resultClear = container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "update-subscription-properties",
+                    "-c",
+                    "persistent://public/default/" + topic,
+                    "--subscription",
+                    "" + subscriptionPrefix + i
+            );
+            resultClear.assertNoOutput();
             i++;
         }
     }


[pulsar] 01/03: [fix][ci] Fix tests memory leak due to mockito-inline (#15513)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d59e70f6fb7e99066d18179b5c8eec6295b6a68
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Jun 3 10:25:13 2022 +0200

    [fix][ci] Fix tests memory leak due to mockito-inline (#15513)
    
    (cherry picked from commit 14b95ecc45f73543fac1385a57fac8b6feeb2f5e)
---
 buildtools/pom.xml                                    | 11 +++++++++++
 .../apache/pulsar/tests/MockitoCleanupListener.java   | 19 ++++++++++++++++---
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 0bc588e8444..1b7a193ff20 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -51,6 +51,7 @@
     <ant.version>1.10.12</ant.version>
     <snakeyaml.version>1.30</snakeyaml.version>
     <test.additional.args></test.additional.args>
+    <mockito.version>3.12.4</mockito.version>
   </properties>
 
   <dependencyManagement>
@@ -137,6 +138,16 @@
       <version>4.1.77.Final</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <version>${mockito.version}</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
index 354a55c10e3..73fff1bb7e2 100644
--- a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests;
 
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,10 +37,22 @@ public class MockitoCleanupListener extends BetweenTestClassesListenerAdapter {
 
     @Override
     protected void onBetweenTestClasses(Class<?> endedTestClass, Class<?> startedTestClass) {
-        if (MOCKITO_CLEANUP_ENABLED && MockitoThreadLocalStateCleaner.INSTANCE.isEnabled()) {
-            LOG.info("Cleaning up Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state.");
-            MockitoThreadLocalStateCleaner.INSTANCE.cleanup();
+        if (MOCKITO_CLEANUP_ENABLED) {
+            if (MockitoThreadLocalStateCleaner.INSTANCE.isEnabled()) {
+                LOG.info("Cleaning up Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state.");
+                MockitoThreadLocalStateCleaner.INSTANCE.cleanup();
+            }
+            cleanupMockitoInline();
         }
     }
 
+    /**
+     * Mockito-inline can leak mocked objects, we need to clean up the inline mocks after every test.
+     * See <a href="https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#47"}>
+     *     mockito docs</a>.
+     */
+    private void cleanupMockitoInline() {
+        Mockito.framework().clearInlineMocks();
+    }
+
 }


[pulsar] 02/03: Issue 15750: PIP-105: Store Subscription properties (#15757)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b46924c3a6e6c2baee9a4dc39f3e8425af33285e
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 3 09:03:05 2022 +0200

    Issue 15750: PIP-105: Store Subscription properties (#15757)
    
    * PIP-105: Store Subscription properties
    
    (cherry picked from commit 23f46a0736e844a2a2fec943ee76d4e1e73ec477)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  15 +++
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   9 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  96 ++++++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  15 ++-
 managed-ledger/src/main/proto/MLDataFormats.proto  |   9 ++
 .../mledger/impl/ManagedCursorContainerTest.java   |   5 +
 .../mledger/impl/ManagedCursorPropertiesTest.java  |  62 ++++++++-
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../nonpersistent/NonPersistentSubscription.java   |  13 +-
 .../service/persistent/PersistentSubscription.java |  21 +++-
 .../broker/service/persistent/PersistentTopic.java |  17 +--
 .../broker/admin/CreateSubscriptionTest.java       | 138 ++++++++++++++-------
 .../pulsar/broker/service/PersistentTopicTest.java |   8 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  10 +-
 .../broker/service/plugin/FilterEntryTest.java     |   8 +-
 .../offload/jcloud/impl/MockManagedLedger.java     |   6 +-
 16 files changed, 339 insertions(+), 95 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index efd183d0bfb..46ca0f14003 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Range;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -79,6 +80,20 @@ public interface ManagedCursor {
      */
     Map<String, Long> getProperties();
 
+    /**
+     * Return any properties that were associated with the cursor.
+     */
+    Map<String, String> getCursorProperties();
+
+     /**
+      * Updates the properties.
+      * @param cursorProperties
+      * @return a handle to the result of the operation
+      */
+     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+         return CompletableFuture.completedFuture(null);
+     }
+
     /**
      * Add a property associated with the last stored position.
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 1f6e0d3af46..7196a3b4c03 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -240,10 +240,13 @@ public interface ManagedLedger {
      * @param properties
      *             user defined properties that will be attached to the first position of the cursor, if the open
      *             operation will trigger the creation of the cursor.
+     * @param cursorProperties
+     *            the properties for the Cursor
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
-    ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
+    ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
+                             Map<String, String> cursorProperties)
             throws InterruptedException, ManagedLedgerException;
 
     /**
@@ -337,13 +340,15 @@ public interface ManagedLedger {
      * @param initialPosition
      *            the cursor will be set at lastest position or not when first created
      *            default is <b>true</b>
+     * @param cursorProperties
+     *            the properties for the Cursor
      * @param callback
      *            callback object
      * @param ctx
      *            opaque context
      */
     void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
-                         OpenCursorCallback callback, Object ctx);
+                         Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx);
 
     /**
      * Get a list of all the cursors reading from this ManagedLedger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d1fe0df5d3d..1092ca0a89d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -92,6 +92,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
@@ -108,6 +109,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     protected final ManagedLedgerConfig config;
     protected final ManagedLedgerImpl ledger;
     private final String name;
+    private volatile Map<String, String> cursorProperties;
     private final BookKeeper.DigestType digestType;
 
     protected volatile PositionImpl markDeletePosition;
@@ -278,6 +280,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
         this.bookkeeper = bookkeeper;
+        this.cursorProperties = Collections.emptyMap();
         this.config = config;
         this.ledger = ledger;
         this.name = cursorName;
@@ -313,6 +316,52 @@ public class ManagedCursorImpl implements ManagedCursor {
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
     }
 
+    @Override
+    public Map<String, String> getCursorProperties() {
+        return cursorProperties;
+    }
+
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
+        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                ManagedCursorInfo copy = ManagedCursorInfo
+                        .newBuilder(info)
+                        .clearCursorProperties()
+                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
+                        .build();
+                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                        name, copy, stat, new MetaStoreCallback<>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
+                                name, cursorProperties);
+                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        cursorLedgerStat = stat;
+                        updateCursorPropertiesResult.complete(result);
+                    }
+
+                    @Override
+                    public void operationFailed(MetaStoreException e) {
+                        log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
+                                name, cursorProperties, e);
+                        updateCursorPropertiesResult.completeExceptionally(e);
+                    }
+                });
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
+                        name, cursorProperties, e);
+                updateCursorPropertiesResult.completeExceptionally(e);
+            }
+        });
+        return updateCursorPropertiesResult;
+    }
+
     @Override
     public boolean putProperty(String key, Long value) {
         if (lastMarkDeleteEntry != null) {
@@ -361,6 +410,18 @@ public class ManagedCursorImpl implements ManagedCursor {
                 cursorLedgerStat = stat;
                 lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
 
+
+                Map<String, String> recoveredCursorProperties = Collections.emptyMap();
+                if (info.getCursorPropertiesCount() > 0) {
+                    // Recover properties map
+                    recoveredCursorProperties = Maps.newHashMap();
+                    for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
+                        StringProperty property = info.getCursorProperties(i);
+                        recoveredCursorProperties.put(property.getName(), property.getValue());
+                    }
+                }
+                cursorProperties = recoveredCursorProperties;
+
                 if (info.getCursorsLedgerId() == -1L) {
                     // There is no cursor ledger to read the last position from. It means the cursor has been properly
                     // closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
@@ -380,7 +441,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         }
                     }
 
-                    recoveredCursor(recoveredPosition, recoveredProperties, null);
+                    recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
                     callback.operationComplete();
                 } else {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
@@ -410,7 +471,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
                         BKException.getMessage(rc));
                 // Rewind to oldest entry available
-                initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+                initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
                 return;
             } else if (rc != BKException.Code.OK) {
                 log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
@@ -426,7 +487,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
                         ledger.getName(), ledgerId, name);
                 // Rewind to last cursor snapshot available
-                initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+                initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
                 return;
             }
 
@@ -438,7 +499,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
                             ledgerId, name, BKException.getMessage(rc1));
                     // Rewind to oldest entry available
-                    initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
+                    initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
                     return;
                 } else if (rc1 != BKException.Code.OK) {
                     log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
@@ -476,7 +537,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
                     recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
                 }
-                recoveredCursor(position, recoveredProperties, lh);
+                recoveredCursor(position, recoveredProperties, cursorProperties, lh);
                 callback.operationComplete();
             }, null);
         };
@@ -547,6 +608,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
+                                 Map<String, String> cursorProperties,
                                  LedgerHandle recoveredFromCursorLedger) {
         // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
         // we need to move to the next existing ledger
@@ -564,7 +626,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             position = ledger.getLastPosition();
         }
         log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
-
+        this.cursorProperties = cursorProperties;
         messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
         markDeletePosition = position;
         persistentMarkDeletePosition = position;
@@ -577,8 +639,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         STATE_UPDATER.set(this, State.NoLedger);
     }
 
-    void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
-        recoveredCursor(position, properties, null);
+    void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
+                    final VoidCallback callback) {
+        recoveredCursor(position, properties, cursorProperties, null);
         if (log.isDebugEnabled()) {
             log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
                     ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
@@ -2383,6 +2446,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 .setLastActive(lastActive); //
 
         info.addAllProperties(buildPropertiesMap(properties));
+        info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
         if (persistIndividualDeletedMessageRanges) {
             info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
             if (config.isDeletionAtBatchIndexLevelEnabled()) {
@@ -2589,7 +2653,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
     }
 
-    private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
+    private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
         if (properties.isEmpty()) {
             return Collections.emptyList();
         }
@@ -2603,6 +2667,20 @@ public class ManagedCursorImpl implements ManagedCursor {
         return longProperties;
     }
 
+    private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
+        if (properties == null || properties.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<StringProperty> stringProperties = Lists.newArrayList();
+        properties.forEach((name, value) -> {
+            StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
+            stringProperties.add(sp);
+        });
+
+        return stringProperties;
+    }
+
     private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
         lock.readLock().lock();
         try {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0879c48de7d..1c7297d880a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -845,11 +845,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @Override
     public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
             throws InterruptedException, ManagedLedgerException {
-        return openCursor(cursorName, initialPosition, Collections.emptyMap());
+        return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap());
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
+    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
+                                    Map<String, String> cursorProperties)
             throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
@@ -858,7 +859,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
         final Result result = new Result();
 
-        asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
+        asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 result.cursor = cursor;
@@ -893,12 +894,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @Override
     public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
             final OpenCursorCallback callback, final Object ctx) {
-        this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
+        this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(),
+                callback, ctx);
     }
 
     @Override
     public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
-            Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
+            Map<String, Long> properties, Map<String, String> cursorProperties,
+                                             final OpenCursorCallback callback, final Object ctx) {
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -932,7 +935,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
         PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
-        cursor.initialize(position, properties, new VoidCallback() {
+        cursor.initialize(position, properties, cursorProperties, new VoidCallback() {
             @Override
             public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 4671816c1a1..c4e502819fa 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -105,6 +105,11 @@ message LongProperty {
     required int64 value  = 2;
 }
 
+message StringProperty {
+    required string name = 1;
+    required string value = 2;
+}
+
 message ManagedCursorInfo {
     // If the ledger id is -1, then the mark-delete position is
     // the one from the (ledgerId, entryId) snapshot below
@@ -123,6 +128,10 @@ message ManagedCursorInfo {
 
     // Store which index in the batch message has been deleted
     repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
+
+    // Additional custom properties associated with
+    // the cursor
+    repeated StringProperty cursorProperties = 8;
 }
 
 enum CompressionType {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index a654b30e60b..05f34df47c1 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -67,6 +67,11 @@ public class ManagedCursorContainerTest {
             return Collections.emptyMap();
         }
 
+        @Override
+        public Map<String, String> getCursorProperties() {
+            return Collections.emptyMap();
+        }
+
         @Override
         public boolean putProperty(String key, Long value) {
             return false;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 727f3850ad2..74db9d791f3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -75,9 +77,15 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
     @Test(timeOut = 20000)
     void testPropertiesRecoveryAfterCrash() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
-        ManagedCursor c1 = ledger.openCursor("c1");
+
+        Map<String, String> cursorProperties = new TreeMap<>();
+        cursorProperties.put("custom1", "one");
+        cursorProperties.put("custom2", "two");
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties);
 
         assertEquals(c1.getProperties(), Collections.emptyMap());
+        assertEquals(c1.getCursorProperties(), cursorProperties);
 
         ledger.addEntry("entry-1".getBytes());
         ledger.addEntry("entry-2".getBytes());
@@ -99,6 +107,7 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
 
         assertEquals(c1.getMarkDeletedPosition(), p3);
         assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
 
         factory2.shutdown();
     }
@@ -148,8 +157,13 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         properties.put("b", 2L);
         properties.put("c", 3L);
 
-        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties);
+        Map<String, String> cursorProperties = new TreeMap<>();
+        cursorProperties.put("custom1", "one");
+        cursorProperties.put("custom2", "two");
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
         assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
 
         ledger.addEntry("entry-1".getBytes());
 
@@ -160,6 +174,50 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         c1 = ledger.openCursor("c1");
 
         assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
     }
 
+    @Test
+    void testUpdateCursorProperties() throws Exception {
+        ManagedLedger ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+
+        Map<String, Long> properties = new TreeMap<>();
+        properties.put("a", 1L);
+
+        Map<String, String> cursorProperties = new TreeMap<>();
+        cursorProperties.put("custom1", "one");
+        cursorProperties.put("custom2", "two");
+
+        ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
+        assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorProperties);
+
+        ledger.addEntry("entry-1".getBytes());
+
+        Map<String, String> cursorPropertiesUpdated = new TreeMap<>();
+        cursorPropertiesUpdated.put("custom1", "three");
+        cursorPropertiesUpdated.put("custom2", "four");
+
+        c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS);
+
+        ledger.close();
+
+        // Reopen the managed ledger
+        ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+        c1 = ledger.openCursor("c1");
+
+        assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
+        // Create a new factory to force a managed ledger close and recovery
+        ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // Reopen the managed ledger
+        ledger = factory2.open("testUpdateCursorProperties", new ManagedLedgerConfig());
+        c1 = ledger.openCursor("c1");
+
+        assertEquals(c1.getProperties(), properties);
+        assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
+        factory2.shutdown();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index b1ccb4d1eb0..49b906b7959 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -103,6 +103,8 @@ public interface Subscription {
 
     Map<String, String> getSubscriptionProperties();
 
+    CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties);
+
     default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
         // Default is no-op
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index ae49b3623ca..a9777f5dd0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -68,7 +68,7 @@ public class NonPersistentSubscription implements Subscription {
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
-    private final Map<String, String> subscriptionProperties;
+    private volatile Map<String, String> subscriptionProperties;
 
     // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
     private final boolean isDurable;
@@ -526,4 +526,15 @@ public class NonPersistentSubscription implements Subscription {
     public Map<String, String> getSubscriptionProperties() {
         return subscriptionProperties;
     }
+
+    @Override
+    public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
+        if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
+          this.subscriptionProperties = Collections.emptyMap();
+        } else {
+           this.subscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 3b23e86f9ad..95dee2a3798 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -22,7 +22,6 @@ import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEvent
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -114,7 +113,7 @@ public class PersistentSubscription implements Subscription {
 
     private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
     private final PendingAckHandle pendingAckHandle;
-    private Map<String, String> subscriptionProperties;
+    private volatile Map<String, String> subscriptionProperties;
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
@@ -137,7 +136,7 @@ public class PersistentSubscription implements Subscription {
     }
 
     public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
-            boolean replicated, Map<String, String> subscriptionProperties) {
+                                  boolean replicated, Map<String, String> subscriptionProperties) {
         this.topic = topic;
         this.cursor = cursor;
         this.topicName = topic.getName();
@@ -146,7 +145,7 @@ public class PersistentSubscription implements Subscription {
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
-                ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
+                ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
                 && !checkTopicIsEventsNames(TopicName.get(topicName))) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -1094,6 +1093,20 @@ public class PersistentSubscription implements Subscription {
         return subscriptionProperties;
     }
 
+    @Override
+    public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
+        Map<String, String> newSubscriptionProperties;
+        if (subscriptionProperties == null || subscriptionProperties.isEmpty()) {
+            newSubscriptionProperties = Collections.emptyMap();
+        } else {
+            newSubscriptionProperties = Collections.unmodifiableMap(subscriptionProperties);
+        }
+        return cursor.setCursorProperties(newSubscriptionProperties)
+                .thenRun(() -> {
+                    this.subscriptionProperties = newSubscriptionProperties;
+                });
+    }
+
     /**
      * Return a merged map that contains the cursor properties specified by used
      * (eg. when using compaction subscription) and the subscription properties.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c2d5522f502..568ac5ae063 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -74,7 +74,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
@@ -277,7 +276,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             } else {
                 final String subscriptionName = Codec.decode(cursor.getName());
                 subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
-                        PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null));
+                        PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+                        cursor.getCursorProperties()));
                 // subscription-cursor gets activated by default: deactivate as there is no active subscription right
                 // now
                 subscriptions.get(subscriptionName).deactivateCursor();
@@ -858,7 +858,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
 
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
+                new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
@@ -878,11 +879,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         return;
                     }
                 }
-                if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
-                        && MapUtils.isNotEmpty(subscriptionProperties)) {
-                    subscription.getSubscriptionProperties().putAll(subscriptionProperties);
-                }
-
                 if (replicated && !subscription.isReplicated()) {
                     // Flip the subscription state
                     subscription.setReplicated(replicated);
@@ -961,11 +957,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Durable subscription with the same name already exists."));
                 }
-
-                if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
-                        && MapUtils.isNotEmpty(subscriptionProperties)) {
-                    subscription.getSubscriptionProperties().putAll(subscriptionProperties);
-                }
             }
 
             if (startMessageRollbackDurationSec > 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index e0f5e0a77a9..12b742a0191 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -213,10 +213,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Map<String, String> properties = subscription.getSubscriptionProperties();
-        assertTrue(properties.containsKey("1"));
-        assertTrue(properties.containsKey("2"));
-        assertEquals(properties.get("1"), "1");
-        assertEquals(properties.get("2"), "2");
+        assertEquals(properties, map);
 
         // after updating mark delete position, the properties should still exist
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
@@ -232,10 +229,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
             assertEquals(subscription.getCursor().getMarkDeletedPosition().getEntryId(), messageId.getEntryId());
         });
         properties = subscription.getSubscriptionProperties();
-        assertTrue(properties.containsKey("1"));
-        assertTrue(properties.containsKey("2"));
-        assertEquals(properties.get("1"), "1");
-        assertEquals(properties.get("2"), "2");
+        assertEquals(properties, map);
 
         consumer.close();
         producer.close();
@@ -249,10 +243,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .getTopicReference(topic).get().getSubscription(subName);
         Awaitility.await().untilAsserted(() -> {
             Map<String, String> properties2 = subscription2.getSubscriptionProperties();
-            assertTrue(properties2.containsKey("1"));
-            assertTrue(properties2.containsKey("2"));
-            assertEquals(properties2.get("1"), "1");
-            assertEquals(properties2.get("2"), "2");
+            assertEquals(properties2, map);
         });
         consumer2.close();
 
@@ -264,13 +255,11 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .receiverQueueSize(1)
                 .subscriptionProperties(map3).subscriptionName(subName).subscribe();
         Map<String, String> properties3 = subscription.getSubscriptionProperties();
-        assertTrue(properties3.containsKey("1"));
-        assertTrue(properties3.containsKey("2"));
-        assertEquals(properties3.get("1"), "1");
-        assertEquals(properties3.get("2"), "2");
+        assertEquals(properties3, map);
         consumer3.close();
 
-        //restart and create a new consumer with new properties, the new properties should be updated
+        //restart and create a new consumer with new properties, the new properties must not be updated
+        // for a Durable subscription, but for a NonDurable subscription we pick up the new values
         restartBroker();
         Consumer<byte[]> consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
                 .topic(topic).receiverQueueSize(1)
@@ -278,10 +267,12 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Map<String, String> properties4 = subscription4.getSubscriptionProperties();
-        assertTrue(properties4.containsKey("3"));
-        assertTrue(properties4.containsKey("4"));
-        assertEquals(properties4.get("3"), "3");
-        assertEquals(properties4.get("4"), "4");
+        if (subscriptionMode == SubscriptionMode.Durable) {
+            assertEquals(properties4, map);
+        } else {
+            assertEquals(properties4, map3);
+
+        }
         consumer4.close();
 
 
@@ -294,26 +285,28 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .getTopicReference(topic).get().getSubscription(subName);
         properties4 = subscription4.getSubscriptionProperties();
         if (subscriptionMode == SubscriptionMode.Durable) {
-            assertTrue(properties4.containsKey("3"));
-            assertTrue(properties4.containsKey("4"));
-            assertEquals(properties4.get("3"), "3");
-            assertEquals(properties4.get("4"), "4");
+            assertEquals(properties4, map);
         } else {
             assertTrue(properties4.isEmpty());
         }
         consumer4.close();
 
-        //restart broker, it won't get any properties
+        //restart broker, properties for Durable subscription are reloaded from Metadata
         restartBroker();
         consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
                 .receiverQueueSize(1)
                 .subscriptionName(subName).subscribe();
         subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
-        assertEquals(subscription4.getSubscriptionProperties().size(), 0);
+        properties4 = subscription4.getSubscriptionProperties();
+        if (subscriptionMode == SubscriptionMode.Durable) {
+            assertEquals(properties4, map);
+        } else {
+            assertTrue(properties4.isEmpty());
+        }
         consumer4.close();
 
-        //restart broker and create a new consumer with new properties, the properties will be updated
+        //restart broker and create a new consumer with new properties, the properties will not be updated
         restartBroker();
         consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
@@ -321,16 +314,17 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                 .subscriptionName(subName).subscribe();
         PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
-        properties = subscription5.getSubscriptionProperties();
-        assertTrue(properties.containsKey("1"));
-        assertTrue(properties.containsKey("2"));
-        assertEquals(properties.get("1"), "1");
-        assertEquals(properties.get("2"), "2");
-        consumer4.close();
+        properties4 = subscription5.getSubscriptionProperties();
+
+        // for the NonDurable subscription here we have the same properties because they
+        // are sent by the Consumer
+        assertEquals(properties4, map);
 
+        consumer4.close();
 
         String subNameShared = "my-sub-shared";
         Map<String, String> mapShared = new HashMap<>();
+        mapShared.put("6", "7");
         // open two consumers with a Shared Subscription
         Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
@@ -342,26 +336,25 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         properties = subscriptionShared.getSubscriptionProperties();
         assertEquals(properties, mapShared);
 
-        // add a new consumer, the properties are updated because they were empty
-        mapShared = new HashMap<>();
-        mapShared.put("6", "7");
-        mapShared.put("8", "9");
+        // add a new consumer, the properties are not updated
+        Map<String, String> mapShared2 = new HashMap<>();
+        mapShared2.put("8", "9");
         Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionProperties(mapShared)
+                .subscriptionProperties(mapShared2)
                 .subscriptionName(subNameShared).subscribe();
 
         properties = subscriptionShared.getSubscriptionProperties();
         assertEquals(properties, mapShared);
 
-        // add a third consumer, the properties are NOT updated because they are not empty
-        Map<String, String> mapShared2 = new HashMap<>();
-        mapShared2.put("10", "11");
+        // add a third consumer, the properties are NOT updated
+        Map<String, String> mapShared3 = new HashMap<>();
+        mapShared3.put("10", "11");
         Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
                 .subscriptionMode(subscriptionMode)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionProperties(mapShared2)
+                .subscriptionProperties(mapShared3)
                 .subscriptionName(subNameShared).subscribe();
 
         properties = subscriptionShared.getSubscriptionProperties();
@@ -373,6 +366,65 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
         consumerShared3.close();
     }
 
+    @Test
+    public void subscriptionModePersistedTest() throws Exception {
+        String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Map<String, String> map = new HashMap<>();
+        map.put("1", "1");
+        map.put("2", "2");
+        String subName = "my-sub";
+        pulsarClient.newConsumer()
+                .subscriptionMode(SubscriptionMode.Durable)
+                .topic(topic)
+                .subscriptionProperties(map)
+                .subscriptionName(subName)
+                .subscribe()
+                .close();
+        PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Map<String, String> properties = subscription.getSubscriptionProperties();
+        assertTrue(properties.containsKey("1"));
+        assertTrue(properties.containsKey("2"));
+        assertEquals(properties.get("1"), "1");
+        assertEquals(properties.get("2"), "2");
+
+        Map<String, String> subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties();
+        assertEquals(map, subscriptionPropertiesFromAdmin);
+
+        // unload the topic
+        admin.topics().unload(topic);
+
+        // verify that the properties are still there
+        subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties();
+        assertEquals(map, subscriptionPropertiesFromAdmin);
+
+
+        // create a new subscription, initially properties are empty
+        String subName2 = "my-sub2";
+        admin.topics().createSubscription(topic, subName2, MessageId.latest);
+
+        subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties();
+        assertTrue(subscriptionPropertiesFromAdmin.isEmpty());
+
+        // create a consumer, this is not allowed to update the properties
+        pulsarClient.newConsumer()
+                .subscriptionMode(SubscriptionMode.Durable)
+                .topic(topic)
+                .subscriptionProperties(map)
+                .subscriptionName(subName2)
+                .subscribe()
+                .close();
+
+        // verify that the properties are not changed
+        subscriptionPropertiesFromAdmin =
+                admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties();
+        assertTrue(subscriptionPropertiesFromAdmin.isEmpty());
+    }
+
     @Test
     public void createSubscriptionBySpecifyingStringPosition() throws IOException, PulsarAdminException {
         final int numberOfMessages = 5;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b75cffde81e..e376ec990fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1551,11 +1551,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
                 return null;
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
-                any(OpenCursorCallback.class), any());
+                any(Map.class), any(OpenCursorCallback.class), any());
 
         doAnswer(new Answer<Object>() {
             @Override
@@ -2204,9 +2204,9 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
             return null;
         }).when(mockLedger).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());
         doAnswer((Answer<Object>) invocationOnMock -> {
-            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null);
+            ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(mockCursor, null);
             return null;
-        }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any());
+        }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any());
         PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService);
 
         CommandSubscribe cmd = new CommandSubscribe()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5bef206a44d..afa5d1aad03 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1654,20 +1654,20 @@ public class ServerCnxTest {
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
-            ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
             return null;
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
-            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+            ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), any(Map.class),
                 any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
-            ((OpenCursorCallback) invocationOnMock.getArguments()[2])
+            ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                     .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
             return null;
         }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());
@@ -1677,7 +1677,7 @@ public class ServerCnxTest {
             ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                     .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
+        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), any(Map.class),
                 any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 81ad811f43c..b2edbda8855 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -69,10 +69,13 @@ public class FilterEntryTest extends BrokerTestBase {
     }
 
     public void testFilter() throws Exception {
-
+        Map<String, String> map = new HashMap<>();
+        map.put("1","1");
+        map.put("2","2");
         String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
         String subName = "sub";
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionProperties(map)
                 .subscriptionName(subName).subscribe();
         // mock entry filters
         PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
@@ -132,9 +135,6 @@ public class FilterEntryTest extends BrokerTestBase {
         });
         consumer.close();
 
-        Map<String, String> map = new HashMap<>();
-        map.put("1","1");
-        map.put("2","2");
         consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map)
                 .subscriptionName(subName).subscribe();
         for (int i = 0; i < 10; i++) {
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 4ad872919e3..3eaf276c3c5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -105,7 +105,8 @@ public class MockManagedLedger implements ManagedLedger {
 
     @Override
     public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition,
-                                    Map<String, Long> properties) throws InterruptedException, ManagedLedgerException {
+                                    Map<String, Long> properties, Map<String, String> cursorProperties)
+            throws InterruptedException, ManagedLedgerException {
         return null;
     }
 
@@ -155,7 +156,8 @@ public class MockManagedLedger implements ManagedLedger {
 
     @Override
     public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
-                                Map<String, Long> properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+                                Map<String, Long> properties, Map<String, String> cursorProperties,
+                                AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
 
     }