You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/20 17:16:23 UTC

[pulsar] branch master updated: Replicated subscriptions - Configuration and client changes (#4299)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e51237  Replicated subscriptions - Configuration and client changes (#4299)
6e51237 is described below

commit 6e51237538cd5011abbbdfbe7fe9c7c21b63fbe2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 20 10:16:15 2019 -0700

    Replicated subscriptions - Configuration and client changes (#4299)
    
    * Replicated subscriptions - Configuration and client changes
    
    * Added missing header
    
    * Fixed mocked methods for tests
    
    * Fixed typo
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   4 +-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |   6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   6 +-
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../org/apache/pulsar/broker/service/Topic.java    |   6 +-
 .../nonpersistent/NonPersistentSubscription.java   |   5 +
 .../service/nonpersistent/NonPersistentTopic.java  |   9 +-
 .../service/persistent/CompactorSubscription.java  |   2 +-
 .../service/persistent/PersistentSubscription.java |  63 ++++++++++-
 .../broker/service/persistent/PersistentTopic.java |  37 ++++---
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   4 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |   4 +-
 .../service/PersistentTopicConcurrentTest.java     |  12 ++-
 .../pulsar/broker/service/PersistentTopicTest.java |  61 +++++++----
 .../pulsar/broker/service/ServerCnxTest.java       |  22 ++++
 .../ReplicatedSubscriptionConfigTest.java          | 120 +++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   6 ++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   6 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 +-
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../org/apache/pulsar/common/api/Commands.java     |   7 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  57 ++++++++++
 .../common/policies/data/SubscriptionStats.java    |   6 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +
 .../pulsar/testclient/PerformanceConsumer.java     |   6 +-
 26 files changed, 403 insertions(+), 67 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f3af7cd..2801be7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -917,7 +917,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative) {
+    protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
@@ -968,7 +968,7 @@ public class PersistentTopicsBase extends AdminResource {
                 }
 
                 PersistentSubscription subscription = (PersistentSubscription) topic
-                        .createSubscription(subscriptionName, InitialPosition.Latest).get();
+                        .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
                 // Mark the cursor as "inactive" as it was created without a real consumer connected
                 subscription.deactivateCursor();
                 subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 09f719a..9766ec6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -292,7 +292,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateTopicName(property, cluster, namespace, encodedTopic);
         return internalGetPartitionedStatsInternal(authoritative);
     }
-    
+
     @DELETE
     @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
     @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@@ -397,9 +397,9 @@ public class PersistentTopics extends PersistentTopicsBase {
     public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
            @PathParam("subscriptionName") String encodedSubName,
-           @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
+           @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
         validateTopicName(property, cluster, namespace, topic);
-        internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
+        internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ca9ff88..d62e0c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -150,7 +150,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
     })
     public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, 
+            @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateGlobalNamespaceOwnership(tenant,namespace);
         validateTopicName(tenant, namespace, encodedTopic);
@@ -316,7 +316,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalGetPartitionedStatsInternal(authoritative);
     }
-    
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
     @ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@@ -390,9 +390,9 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 405, message = "Not supported for partitioned topics") })
     public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
         validateTopicName(tenant, namespace, topic);
-        internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
+        internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b5b6fe9..4488613 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -611,6 +611,7 @@ public class ServerCnx extends PulsarHandler {
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
         final InitialPosition initialPosition = subscribe.getInitialPosition();
         final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
+        final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -685,7 +686,7 @@ public class ServerCnx extends PulsarHandler {
                                                         return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                                                 subType, priorityLevel, consumerName, isDurable,
                                                                 startMessageId, metadata,
-                                                                readCompacted, initialPosition);
+                                                                readCompacted, initialPosition, isReplicated);
                                                     } else {
                                                         return FutureUtil.failedFuture(
                                                                 new IncompatibleSchemaException(
@@ -696,7 +697,8 @@ public class ServerCnx extends PulsarHandler {
                                     } else {
                                         return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                             subType, priorityLevel, consumerName, isDurable,
-                                            startMessageId, metadata, readCompacted, initialPosition);
+                                            startMessageId, metadata, readCompacted, initialPosition,
+                                            isReplicated);
                                     }
                                 })
                                 .thenAccept(consumer -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 8076bd3..cf4c862 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -44,6 +44,8 @@ public interface Subscription {
 
     String getTopicName();
 
+    boolean isReplicated();
+
     Dispatcher getDispatcher();
 
     long getNumberOfEntriesInBacklog();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 12c9bdc..5c844ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -92,9 +92,11 @@ public interface Topic {
 
     CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+            boolean replicateSubscriptionState);
 
-    CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
+    CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
+            boolean replicateSubscriptionState);
 
     CompletableFuture<Void> unsubscribe(String subName);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 0139d14..5c060ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -76,6 +76,11 @@ public class NonPersistentSubscription implements Subscription {
     }
 
     @Override
+    public boolean isReplicated() {
+        return false;
+    }
+
+    @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
         if (IS_FENCED_UPDATER.get(this) == TRUE) {
             log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 10c8324..4c3323c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -233,7 +233,7 @@ public class NonPersistentTopic implements Topic {
         lock.readLock().lock();
         try {
             brokerService.checkTopicNsOwnership(getName());
-            
+
             if (isFenced) {
                 log.warn("[{}] Attempting to add producer to a fenced topic", topic);
                 throw new TopicFencedException("Topic is temporarily unavailable");
@@ -315,10 +315,11 @@ public class NonPersistentTopic implements Topic {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+            boolean replicateSubscriptionState) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
-        
+
         try {
             brokerService.checkTopicNsOwnership(getName());
         } catch (Exception e) {
@@ -396,7 +397,7 @@ public class NonPersistentTopic implements Topic {
     }
 
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
         return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 316ebc5..0c3feb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -41,7 +41,7 @@ public class CompactorSubscription extends PersistentSubscription {
 
     public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
                                  String subscriptionName, ManagedCursor cursor) {
-        super(topic, subscriptionName, cursor);
+        super(topic, subscriptionName, cursor, false);
         checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
         this.compactedTopic = compactedTopic;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ce9d4aa..a98af61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.google.common.base.MoreObjects;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
@@ -40,7 +41,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositio
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
@@ -76,12 +76,36 @@ public class PersistentSubscription implements Subscription {
     // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
-    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
+    private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
+
+    // Map of properties that is used to mark this subscription as "replicated".
+    // Since this is the only field at this point, we can just keep a static
+    // instance of the map.
+    private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
+    private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
+
+    private volatile boolean isReplicated;
+
+    static {
+        REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
+    }
+
+    static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
+        return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
+    }
+
+    static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
+        return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
+    }
+
+    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
+            boolean replicated) {
         this.topic = topic;
         this.cursor = cursor;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
+        this.isReplicated = replicated;
         IS_FENCED_UPDATER.set(this, FALSE);
     }
 
@@ -96,6 +120,15 @@ public class PersistentSubscription implements Subscription {
     }
 
     @Override
+    public boolean isReplicated() {
+        return isReplicated;
+    }
+
+    void setReplicated(boolean replicated) {
+        this.isReplicated = replicated;
+    }
+
+    @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
         cursor.updateLastActive();
         if (IS_FENCED_UPDATER.get(this) == TRUE) {
@@ -194,7 +227,7 @@ public class PersistentSubscription implements Subscription {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
             }
-            cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
+            cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, position);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
@@ -647,7 +680,7 @@ public class PersistentSubscription implements Subscription {
         }
         subStats.msgBacklog = getNumberOfEntriesInBacklog();
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
-
+        subStats.isReplicated = isReplicated;
         return subStats;
     }
 
@@ -681,5 +714,25 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
+    /**
+     * Return a merged map that contains the cursor properties specified by used
+     * (eg. when using compaction subscription) and the subscription properties.
+     */
+    protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
+        Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
+                : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
+
+        if (userProperties.isEmpty()) {
+            // Use only the static instance in the common case
+            return baseProperties;
+        } else {
+            Map<String, Long> merged = new TreeMap<>();
+            merged.putAll(userProperties);
+            merged.putAll(baseProperties);
+            return merged;
+        }
+
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ed59c37..e21e359 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -250,7 +250,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 // to take care of it
             } else {
                 final String subscriptionName = Codec.decode(cursor.getName());
-                subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor));
+                subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
+                        PersistentSubscription.isCursorFromReplicatedSubscription(cursor)));
                 // subscription-cursor gets activated by default: deactivate as there is no active subscription right
                 // now
                 subscriptions.get(subscriptionName).deactivateCursor();
@@ -298,12 +299,13 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
     }
 
-    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) {
+    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
+            boolean replicated) {
         checkNotNull(compactedTopic);
         if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
             return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
         } else {
-            return new PersistentSubscription(this, subscriptionName, cursor);
+            return new PersistentSubscription(this, subscriptionName, cursor, replicated);
         }
     }
 
@@ -496,7 +498,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+            boolean replicatedSubscriptionState) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -571,7 +574,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
 
         CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
-                getDurableSubscription(subscriptionName, initialPosition) //
+                getDurableSubscription(subscriptionName, initialPosition, replicatedSubscriptionState) //
                 : getNonDurableSubscription(subscriptionName, startMessageId);
 
         int maxUnackedMessages  = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0;
@@ -617,17 +620,27 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         return future;
     }
 
-    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, InitialPosition initialPosition) {
+    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
+            InitialPosition initialPosition, boolean replicated) {
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, new OpenCursorCallback() {
+
+        Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
+
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
                 }
 
-                subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName,
-                        name -> createPersistentSubscription(subscriptionName, cursor)));
+                PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
+                        name -> createPersistentSubscription(subscriptionName, cursor, replicated));
+
+                if (replicated && !subscription.isReplicated()) {
+                    // Flip the subscription state
+                    subscription.setReplicated(replicated);
+                }
+                subscriptionFuture.complete(subscription);
             }
 
             @Override
@@ -671,7 +684,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 subscriptionFuture.completeExceptionally(e);
             }
 
-            return new PersistentSubscription(this, subscriptionName, cursor);
+            return new PersistentSubscription(this, subscriptionName, cursor, false);
         });
 
         if (!subscriptionFuture.isDone()) {
@@ -686,8 +699,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
     @SuppressWarnings("unchecked")
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
-        return getDurableSubscription(subscriptionName, initialPosition);
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
+        return getDurableSubscription(subscriptionName, initialPosition, replicateSubscriptionState);
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index e75078f..6efe601 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -110,7 +110,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         } catch (Exception e) {
             Assert.assertEquals("Topic partitions were not yet created", e.getMessage());
         }
-        persistentTopics.createSubscription(testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest);
+        persistentTopics.createSubscription(testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
         List<String> subscriptions =  persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true);
         Assert.assertTrue(subscriptions.contains("test"));
         persistentTopics.deleteSubscription(testTenant, testNamespace, testLocalTopicName, "test", true);
@@ -123,7 +123,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testNonPartitionedTopics() {
     	pulsar.getConfiguration().setAllowAutoTopicCreation(false);
     	final String nonPartitionTopic = "non-partitioned-topic";
-    	persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest);
+    	persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest, false);
     	try {
     		persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true);
     	} catch (RestException exc) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 24cb812..cb5a2ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -263,7 +263,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     @Test
     public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
 
         int partitionIndex = 0;
         PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
@@ -302,7 +302,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
 
         int partitionIndex = 0;
         PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 7c5ca20..01895a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -123,7 +123,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -181,7 +182,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -243,7 +245,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -301,7 +304,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8b3c721..93664ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -433,7 +433,8 @@ public class PersistentTopicTest {
                 .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false);
         try {
             f1.get();
             fail("should fail with exception");
@@ -452,12 +453,14 @@ public class PersistentTopicTest {
 
         // 1. simple subscribe
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false);
         f1.get();
 
         // 2. duplicate subscribe
         Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false);
 
         try {
             f2.get();
@@ -477,7 +480,7 @@ public class PersistentTopicTest {
     @Test
     public void testAddRemoveConsumer() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
 
         // 1. simple add consumer
         Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
@@ -508,8 +511,8 @@ public class PersistentTopicTest {
 
     public void testMaxConsumersShared() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
-        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false);
 
         // for count consumers on topic
         ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
@@ -599,8 +602,8 @@ public class PersistentTopicTest {
     public void testMaxConsumersFailover() throws Exception {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
-        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false);
 
         // for count consumers on topic
         ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
@@ -690,7 +693,7 @@ public class PersistentTopicTest {
     @Test
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
                 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer1);
@@ -744,7 +747,8 @@ public class PersistentTopicTest {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -759,7 +763,8 @@ public class PersistentTopicTest {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -813,7 +818,8 @@ public class PersistentTopicTest {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -900,7 +906,8 @@ public class PersistentTopicTest {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
 
         try {
             f.get();
@@ -911,6 +918,7 @@ public class PersistentTopicTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
     void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
         cursorMock = mock(ManagedCursor.class);
@@ -984,6 +992,15 @@ public class PersistentTopicTest {
             }
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
 
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+                return null;
+            }
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+                any(OpenCursorCallback.class), anyObject());
+
         // call deleteLedgerComplete on ledger asyncDelete
         doAnswer(new Answer<Object>() {
             @Override
@@ -1018,7 +1035,8 @@ public class PersistentTopicTest {
         // 1. Subscribe with non partition topic
         Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
                 cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
-                cmd1.getReadCompacted(), InitialPosition.Latest);
+                cmd1.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f1.get();
 
         // 2. Subscribe with partition topic
@@ -1030,7 +1048,8 @@ public class PersistentTopicTest {
 
         Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
                 cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
-                cmd2.getReadCompacted(), InitialPosition.Latest);
+                cmd2.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f2.get();
 
         // 3. Subscribe and create second consumer
@@ -1040,7 +1059,8 @@ public class PersistentTopicTest {
 
         Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
                 cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
-                cmd3.getReadCompacted(), InitialPosition.Latest);
+                cmd3.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f3.get();
 
         assertEquals(
@@ -1061,7 +1081,8 @@ public class PersistentTopicTest {
 
         Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
                 cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
-                cmd4.getReadCompacted(), InitialPosition.Latest);
+                cmd4.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f4.get();
 
         assertEquals(
@@ -1087,7 +1108,8 @@ public class PersistentTopicTest {
 
         Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
                 cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
-                cmd5.getReadCompacted(), InitialPosition.Latest);
+                cmd5.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
 
         try {
             f5.get();
@@ -1104,7 +1126,8 @@ public class PersistentTopicTest {
 
         Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
                 cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
-                cmd6.getReadCompacted(), InitialPosition.Latest);
+                cmd6.getReadCompacted(), InitialPosition.Latest,
+                false /* replicated */);
         f6.get();
 
         // 7. unsubscribe exclusive sub
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 82f61c2..a43057c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -1483,6 +1484,16 @@ public class ServerCnxTest {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 Thread.sleep(300);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+                return null;
+            }
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+                any(OpenCursorCallback.class), anyObject());
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Thread.sleep(300);
                 ((OpenCursorCallback) invocationOnMock.getArguments()[2])
                         .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
                 return null;
@@ -1492,6 +1503,17 @@ public class ServerCnxTest {
         doAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Thread.sleep(300);
+                ((OpenCursorCallback) invocationOnMock.getArguments()[3])
+                        .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
+                return null;
+            }
+        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
+                any(OpenCursorCallback.class), anyObject());
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
                 return null;
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
new file mode 100644
index 0000000..8ec937d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ReplicatedSubscriptionConfigTest extends ProducerConsumerBase {
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void createReplicatedSubscription() throws Exception {
+        String topic = "createReplicatedSubscription-" + System.nanoTime();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .replicateSubscriptionState(true)
+                .subscribe();
+
+        TopicStats stats = admin.topics().getStats(topic);
+        assertTrue(stats.subscriptions.get("sub1").isReplicated);
+
+        admin.topics().unload(topic);
+
+        // Check that subscription is still marked replicated after reloading
+        stats = admin.topics().getStats(topic);
+        assertTrue(stats.subscriptions.get("sub1").isReplicated);
+    }
+
+    @Test
+    public void upgradeToReplicatedSubscription() throws Exception {
+        String topic = "upgradeToReplicatedSubscription-" + System.nanoTime();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .replicateSubscriptionState(false)
+                .subscribe();
+
+        TopicStats stats = admin.topics().getStats(topic);
+        assertFalse(stats.subscriptions.get("sub").isReplicated);
+        consumer.close();
+
+        consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .replicateSubscriptionState(true)
+                .subscribe();
+
+        stats = admin.topics().getStats(topic);
+        assertTrue(stats.subscriptions.get("sub").isReplicated);
+        consumer.close();
+    }
+
+    @Test
+    public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception {
+        String topic = "upgradeToReplicatedSubscriptionAfterRestart-" + System.nanoTime();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .replicateSubscriptionState(false)
+                .subscribe();
+
+        TopicStats stats = admin.topics().getStats(topic);
+        assertFalse(stats.subscriptions.get("sub").isReplicated);
+        consumer.close();
+
+        admin.topics().unload(topic);
+
+        consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .replicateSubscriptionState(true)
+                .subscribe();
+
+        stats = admin.topics().getStats(topic);
+        assertTrue(stats.subscriptions.get("sub").isReplicated);
+        consumer.close();
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 8cb50f9..f8c5e0a 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -297,6 +297,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);
 
     /**
+     *
+     * @param replicateSubscriptionState
+     */
+    ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);
+
+    /**
      * Set the max total receiver queue size across partitons.
      * <p>
      * This setting will be used to reduce the receiver queue size for individual partitions
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 6f8d9c9..19f5c6e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -279,6 +279,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState) {
+        conf.setReplicateSubscriptionState(replicateSubscriptionState);
+        return this;
+    }
+
+    @Override
     public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors) {
         if (interceptorList == null) {
             interceptorList = new ArrayList<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b2421f2..8d91ca1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -526,7 +526,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             si = null;
         }
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), si);
+                consumerName, isDurable, startMessageIdData, metadata, readCompacted,
+                conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+                si);
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index fd4b719..3352ebb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -100,6 +100,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private boolean autoUpdatePartitions = true;
 
+    private boolean replicateSubscriptionState = false;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 04018f6..2d76410 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -418,12 +418,14 @@ public class Commands {
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest, null);
+                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false,
+                false /* isReplicated */, InitialPosition.Earliest, null);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
+            Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
+            InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
         CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -435,6 +437,7 @@ public class Commands {
         subscribeBuilder.setDurable(isDurable);
         subscribeBuilder.setReadCompacted(readCompacted);
         subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
+        subscribeBuilder.setReplicateSubscriptionState(isReplicated);
         if (startMessageId != null) {
             subscribeBuilder.setStartMessageId(startMessageId);
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 319302d..625752b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -8471,6 +8471,10 @@ public final class PulsarApi {
     // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
     boolean hasInitialPosition();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition();
+    
+    // optional bool replicate_subscription_state = 14;
+    boolean hasReplicateSubscriptionState();
+    boolean getReplicateSubscriptionState();
   }
   public static final class CommandSubscribe extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -8802,6 +8806,16 @@ public final class PulsarApi {
       return initialPosition_;
     }
     
+    // optional bool replicate_subscription_state = 14;
+    public static final int REPLICATE_SUBSCRIPTION_STATE_FIELD_NUMBER = 14;
+    private boolean replicateSubscriptionState_;
+    public boolean hasReplicateSubscriptionState() {
+      return ((bitField0_ & 0x00001000) == 0x00001000);
+    }
+    public boolean getReplicateSubscriptionState() {
+      return replicateSubscriptionState_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -8816,6 +8830,7 @@ public final class PulsarApi {
       readCompacted_ = false;
       schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
       initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+      replicateSubscriptionState_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8911,6 +8926,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000800) == 0x00000800)) {
         output.writeEnum(13, initialPosition_.getNumber());
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        output.writeBool(14, replicateSubscriptionState_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -8971,6 +8989,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeEnumSize(13, initialPosition_.getNumber());
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(14, replicateSubscriptionState_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -9110,6 +9132,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000800);
         initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
         bitField0_ = (bitField0_ & ~0x00001000);
+        replicateSubscriptionState_ = false;
+        bitField0_ = (bitField0_ & ~0x00002000);
         return this;
       }
       
@@ -9196,6 +9220,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000800;
         }
         result.initialPosition_ = initialPosition_;
+        if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+          to_bitField0_ |= 0x00001000;
+        }
+        result.replicateSubscriptionState_ = replicateSubscriptionState_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -9248,6 +9276,9 @@ public final class PulsarApi {
         if (other.hasInitialPosition()) {
           setInitialPosition(other.getInitialPosition());
         }
+        if (other.hasReplicateSubscriptionState()) {
+          setReplicateSubscriptionState(other.getReplicateSubscriptionState());
+        }
         return this;
       }
       
@@ -9399,6 +9430,11 @@ public final class PulsarApi {
               }
               break;
             }
+            case 112: {
+              bitField0_ |= 0x00002000;
+              replicateSubscriptionState_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -9841,6 +9877,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool replicate_subscription_state = 14;
+      private boolean replicateSubscriptionState_ ;
+      public boolean hasReplicateSubscriptionState() {
+        return ((bitField0_ & 0x00002000) == 0x00002000);
+      }
+      public boolean getReplicateSubscriptionState() {
+        return replicateSubscriptionState_;
+      }
+      public Builder setReplicateSubscriptionState(boolean value) {
+        bitField0_ |= 0x00002000;
+        replicateSubscriptionState_ = value;
+        
+        return this;
+      }
+      public Builder clearReplicateSubscriptionState() {
+        bitField0_ = (bitField0_ & ~0x00002000);
+        replicateSubscriptionState_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index f141d4a..40e3dad 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -43,7 +43,7 @@ public class SubscriptionStats {
 
     /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages */
     public boolean blockedSubscriptionOnUnackedMsgs;
-    
+
     /** Number of unacknowledged messages for the subscription */
     public long unackedMessages;
 
@@ -59,6 +59,9 @@ public class SubscriptionStats {
     /** List of connected consumers on this subscription w/ their stats */
     public List<ConsumerStats> consumers;
 
+    /** Mark that the subscription state is kept in sync across different regions */
+    public boolean isReplicated;
+
     public SubscriptionStats() {
         this.consumers = Lists.newArrayList();
     }
@@ -83,6 +86,7 @@ public class SubscriptionStats {
         this.msgBacklog += stats.msgBacklog;
         this.unackedMessages += stats.unackedMessages;
         this.msgRateExpired += stats.msgRateExpired;
+        this.isReplicated |= stats.isReplicated;
         if (this.consumers.size() != stats.consumers.size()) {
             for (int i = 0; i < stats.consumers.size(); i++) {
                 ConsumerStats consumerStats = new ConsumerStats();
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 11d69be..58238d7 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -272,6 +272,11 @@ message CommandSubscribe {
 	// Signal wthether the subscription will initialize on latest
 	// or not -- earliest
 	optional InitialPosition initialPosition = 13 [default = Latest];
+
+    // Mark the subscription as "replicated". Pulsar will make sure
+    // to periodically sync the state of replicated subscriptions
+    // across different clusters (when using geo-replication).
+    optional bool replicate_subscription_state = 14;
 }
 
 message CommandPartitionedTopicMetadata {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 98b3735..800fc80 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -95,6 +95,9 @@ public class PerformanceConsumer {
         @Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue")
         public int receiverQueueSize = 1000;
 
+        @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
+        public boolean replicatedSubscription = false;
+
         @Parameter(names = { "--acks-delay-millis" }, description = "Acknowlegments grouping delay in millis")
         public int acknowledgmentsGroupingDelayMillis = 100;
 
@@ -253,7 +256,8 @@ public class PerformanceConsumer {
                 .messageListener(listener) //
                 .receiverQueueSize(arguments.receiverQueueSize) //
                 .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
-                .subscriptionType(arguments.subscriptionType);
+                .subscriptionType(arguments.subscriptionType)
+                .replicateSubscriptionState(arguments.replicatedSubscription);
 
         if (arguments.encKeyName != null) {
             byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));