You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/17 00:09:15 UTC

[pulsar] branch master updated: Replicated subscriptions - Advance remote cursor when local consumers moves ahead (#4396)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7be1ee1  Replicated subscriptions - Advance remote cursor when local consumers moves ahead (#4396)
7be1ee1 is described below

commit 7be1ee1fdb59421ac858b38840d3baf8c9073a5c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Jun 16 17:09:08 2019 -0700

    Replicated subscriptions - Advance remote cursor when local consumers moves ahead (#4396)
    
    ### Motivation
    
    This is the 4th and last (implementation) change for pip-33.
    
    It includes reading and caching the last N snapshots and sending the updates to the other clusters.
    
    Previous PRs:
     1. #4299
     2. #4340
     3. #4354
---
 conf/broker.conf                                   |   3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +-
 .../broker/service/AbstractBaseDispatcher.java     |  27 ++++-
 .../apache/pulsar/broker/service/Subscription.java |   5 +
 .../service/persistent/PersistentSubscription.java |  40 ++++++--
 .../broker/service/persistent/PersistentTopic.java |   4 +
 .../ReplicatedSubscriptionSnapshotCache.java       |  92 +++++++++++++++++
 .../ReplicatedSubscriptionsController.java         |  63 +++++++++++-
 .../ReplicatedSubscriptionSnapshotCacheTest.java   | 114 +++++++++++++++++++++
 .../org/apache/pulsar/common/protocol/Markers.java |   8 +-
 10 files changed, 350 insertions(+), 13 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 05866f8..022085a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -278,6 +278,9 @@ replicatedSubscriptionsSnapshotFrequencyMillis=1000
 # Timeout for building a consistent snapshot for tracking replicated subscriptions state.
 replicatedSubscriptionsSnapshotTimeoutSeconds=30
 
+# Max number of snapshot to be cached per subscription.
+replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 7b15ab1..840117d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -536,6 +536,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
     private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Max number of snapshot to be cached per subscription.")
+    private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
+
     /***** --- TLS --- ****/
     @FieldContext(
         category = CATEGORY_TLS,
@@ -905,7 +910,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
     @FieldContext(
             category = CATEGORY_STORAGE_OFFLOADING,
-            doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)" 
+            doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
         )
     private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
     @FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 896d270..da06bd2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -24,13 +24,17 @@ import io.netty.buffer.ByteBuf;
 import java.util.Collections;
 import java.util.List;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 
+@Slf4j
 public abstract class AbstractBaseDispatcher implements Dispatcher {
 
     protected final Subscription subscription;
@@ -61,20 +65,26 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
      * @param subscription
      *            the subscription object
      */
-    public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo) {
+    public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
+            SendMessageInfo sendMessageInfo) {
         int totalMessages = 0;
         long totalBytes = 0;
 
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             Entry entry = entries.get(i);
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            PositionImpl pos = (PositionImpl) entry.getPosition();
 
             MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
 
             try {
                 if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
+                    PositionImpl pos = (PositionImpl) entry.getPosition();
                     // Message metadata was corrupted or the messages was a server-only marker
+
+                    if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
+                        processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
+                    }
+
                     entries.set(i, null);
                     entry.release();
                     subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
@@ -100,4 +110,17 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
     }
+
+    private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
+        // Remove the protobuf headers
+        Commands.skipMessageMetadata(headersAndPayload);
+
+        try {
+            ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
+            subscription.processReplicatedSubscriptionSnapshot(snapshot);
+        } catch (Throwable t) {
+            log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
+            return;
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 4851b5b..c044fa5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 
 public interface Subscription {
 
@@ -90,6 +91,10 @@ public interface Subscription {
 
     void addUnAckedMessages(int unAckMessages);
 
+    default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
+        // Default is no-op
+    }
+
     // Subscription utils
     static boolean isCumulativeAckMode(SubType subType) {
         return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 42a1479..70be6ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -84,7 +85,7 @@ public class PersistentSubscription implements Subscription {
     private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
     private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
 
-    private volatile boolean isReplicated;
+    private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
 
     static {
         REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
@@ -105,7 +106,7 @@ public class PersistentSubscription implements Subscription {
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
-        this.isReplicated = replicated;
+        this.setReplicated(replicated);
         IS_FENCED_UPDATER.set(this, FALSE);
     }
 
@@ -121,11 +122,15 @@ public class PersistentSubscription implements Subscription {
 
     @Override
     public boolean isReplicated() {
-        return isReplicated;
+        return replicatedSubscriptionSnapshotCache != null;
     }
 
     void setReplicated(boolean replicated) {
-        this.isReplicated = replicated;
+        this.replicatedSubscriptionSnapshotCache = replicated
+                ? new ReplicatedSubscriptionSnapshotCache(subName,
+                        topic.getBrokerService().pulsar().getConfiguration()
+                                .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
+                : null;
     }
 
     @Override
@@ -217,6 +222,8 @@ public class PersistentSubscription implements Subscription {
 
     @Override
     public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties) {
+        Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
+
         if (ackType == AckType.Cumulative) {
             if (positions.size() != 1) {
                 log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids", topicName, subName);
@@ -236,6 +243,19 @@ public class PersistentSubscription implements Subscription {
             dispatcher.getRedeliveryTracker().removeBatch(positions);
         }
 
+        if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
+            // Mark delete position advance
+            ReplicatedSubscriptionSnapshotCache snapshotCache  = this.replicatedSubscriptionSnapshotCache;
+            if (snapshotCache != null) {
+                ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
+                        .advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
+                if (snapshot != null) {
+                    topic.getReplicatedSubscriptionController()
+                            .ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
+                }
+            }
+        }
+
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
             // Notify all consumer that the end of topic was reached
             dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
@@ -680,7 +700,7 @@ public class PersistentSubscription implements Subscription {
         }
         subStats.msgBacklog = getNumberOfEntriesInBacklog();
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
-        subStats.isReplicated = isReplicated;
+        subStats.isReplicated = isReplicated();
         return subStats;
     }
 
@@ -728,7 +748,7 @@ public class PersistentSubscription implements Subscription {
      * (eg. when using compaction subscription) and the subscription properties.
      */
     protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
-        Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
+        Map<String, Long> baseProperties = isReplicated() ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
                 : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
 
         if (userProperties.isEmpty()) {
@@ -743,5 +763,13 @@ public class PersistentSubscription implements Subscription {
 
     }
 
+    @Override
+    public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
+        ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
+        if (snapshotCache != null) {
+            snapshotCache.addNewSnapshot(snapshot);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0f132c0..9202ac8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1892,6 +1892,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
+     }
+
+    Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController() {
+        return replicatedSubscriptionsController;
     }
 
     public CompactedTopic getCompactedTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
new file mode 100644
index 0000000..185e742
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
+
+/**
+ * Store the last N snapshots that were scanned by a particular subscription
+ */
+@Slf4j
+public class ReplicatedSubscriptionSnapshotCache {
+    private final String subscription;
+    private final NavigableMap<PositionImpl, ReplicatedSubscriptionsSnapshot> snapshots;
+    private final int maxSnapshotToCache;
+
+    public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) {
+        this.subscription = subscription;
+        this.snapshots = new TreeMap<>();
+        this.maxSnapshotToCache = maxSnapshotToCache;
+    }
+
+    public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
+        MessageIdData msgId = snapshot.getLocalMessageId();
+        PositionImpl position = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position,
+                    snapshot.getSnapshotId());
+        }
+
+        snapshots.put(position, snapshot);
+
+        // Prune the cache
+        while (snapshots.size() > maxSnapshotToCache) {
+            snapshots.pollFirstEntry();
+        }
+    }
+
+    /**
+     * Signal that the mark-delete position on the subscription has been advanced. If there is a snapshot that
+     * correspond to this position, it will returned, other it will return null.
+     */
+    public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(PositionImpl pos) {
+        ReplicatedSubscriptionsSnapshot snapshot = null;
+        while (!snapshots.isEmpty()) {
+            PositionImpl first = snapshots.firstKey();
+            if (first.compareTo(pos) > 0) {
+                // Snapshot is associated which an higher position, so it cannot be used now
+                break;
+            } else {
+                // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we
+                // can use
+                snapshot = snapshots.pollFirstEntry().getValue();
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            if (snapshot != null) {
+                log.debug("[{}] Advanced mark-delete position to {} -- found snapshot {} at {}:{}", subscription, pos,
+                        snapshot.getSnapshotId(),
+                        snapshot.getLocalMessageId().getLedgerId(),
+                        snapshot.getLocalMessageId().getEntryId());
+            } else {
+                log.debug("[{}] Advanced mark-delete position to {} -- snapshot not found", subscription, pos);
+            }
+        }
+        return snapshot;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 232cf59..8b64c14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -23,24 +23,32 @@ import io.prometheus.client.Gauge;
 
 import java.io.IOException;
 import java.time.Clock;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsUpdate;
+import org.apache.pulsar.common.protocol.Markers;
 
 /**
  * Encapsulate all the logic of replicated subscriptions tracking for a given topic.
@@ -93,6 +101,25 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
         }
     }
 
+    public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
+                    snapshot.getClustersList().stream()
+                            .map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
+                                    cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
+                            .collect(Collectors.toList()));
+        }
+
+        Map<String, MessageIdData> clusterIds = new TreeMap<>();
+        for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
+            ClusterMessageId cmid = snapshot.getClusters(i);
+            clusterIds.put(cmid.getCluster(), cmid.getMessageId());
+        }
+
+        ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
+        topic.publishMessage(subscriptionUpdate, this);
+    }
+
     private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
         // Send response containing the current last written message id. The response
         // marker we're publishing locally and then replicating will have a higher
@@ -115,8 +142,10 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
         String snapshotId = response.getSnapshotId();
         ReplicatedSubscriptionsSnapshotBuilder builder = pendingSnapshots.get(snapshotId);
         if (builder == null) {
-            log.info("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
-                    response.getCluster().getCluster());
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
+                        response.getCluster().getCluster());
+            }
             return;
         }
 
@@ -124,7 +153,35 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
     }
 
     private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
+        MessageIdData updatedMessageId = null;
+        for (int i = 0, size = update.getClustersCount(); i < size; i++) {
+            ClusterMessageId cmid = update.getClusters(i);
+            if (localCluster.equals(cmid.getCluster())) {
+                updatedMessageId = cmid.getMessageId();
+            }
+        }
+
+        if (updatedMessageId == null) {
+            // No updates for this cluster, ignore
+            return;
+        }
 
+        Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Received update for subscription to {}", topic, update.getSubscriptionName(), pos);
+        }
+
+        PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
+        if (sub != null) {
+            sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
+        } else {
+            // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
+            log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
+                    topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
+            topic.createSubscription(update.getSubscriptionName(),
+                    InitialPosition.Latest, true /* replicateSubscriptionState */);
+        }
     }
 
     private void startNewSnapshot() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
new file mode 100644
index 0000000..a103945
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
+import org.testng.annotations.Test;
+
+public class ReplicatedSubscriptionSnapshotCacheTest {
+    @Test
+    public void testSnashotCache() {
+        ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 10);
+
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(0, 0)));
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(100, 0)));
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-1")
+                .setLocalMessageId(newMessageId(1, 1))
+                .build());
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-2")
+                .setLocalMessageId(newMessageId(2, 2))
+                .build());
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-5")
+                .setLocalMessageId(newMessageId(5, 5))
+                .build());
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-7")
+                .setLocalMessageId(newMessageId(7, 7))
+                .build());
+
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(0, 0)));
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(1, 0)));
+        ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(new PositionImpl(1, 1));
+        assertNotNull(snapshot);
+        assertEquals(snapshot.getSnapshotId(), "snapshot-1");
+
+        snapshot = cache.advancedMarkDeletePosition(new PositionImpl(5, 6));
+        assertNotNull(snapshot);
+        assertEquals(snapshot.getSnapshotId(), "snapshot-5");
+
+        // Snapshots should have been now removed
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(2, 2)));
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(5, 5)));
+    }
+
+    @Test
+    public void testSnashotCachePruning() {
+        ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 3);
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-1")
+                .setLocalMessageId(newMessageId(1, 1))
+                .build());
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-2")
+                .setLocalMessageId(newMessageId(2, 2))
+                .build());
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-3")
+                .setLocalMessageId(newMessageId(3, 3))
+                .build());
+
+        cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+                .setSnapshotId("snapshot-4")
+                .setLocalMessageId(newMessageId(4, 4))
+                .build());
+
+        // Snapshot-1 was already pruned
+        assertNull(cache.advancedMarkDeletePosition(new PositionImpl(1, 1)));
+        ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(new PositionImpl(2, 2));
+        assertNotNull(snapshot);
+        assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+
+        snapshot = cache.advancedMarkDeletePosition(new PositionImpl(5, 5));
+        assertNotNull(snapshot);
+        assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+    }
+
+    private MessageIdData newMessageId(long ledgerId, long entryId) {
+        return MessageIdData.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .build();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 4884348..6d495be 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -69,6 +69,12 @@ public class Markers {
         return msgMetadata.hasMarkerType();
     }
 
+    public static boolean isReplicatedSubscriptionSnapshotMarker(MessageMetadata msgMetadata) {
+        return msgMetadata != null
+                && msgMetadata.hasMarkerType()
+                && msgMetadata.getMarkerType() == MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_VALUE;
+    }
+
     @SneakyThrows
     public static ByteBuf newReplicatedSubscriptionsSnapshotRequest(String snapshotId, String sourceCluster) {
         ReplicatedSubscriptionsSnapshotRequest.Builder builder = ReplicatedSubscriptionsSnapshotRequest.newBuilder();
@@ -226,7 +232,7 @@ public class Markers {
         ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
         try {
             update.writeTo(outStream);
-            return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, Optional.empty(), payload);
+            return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_UPDATE, Optional.empty(), payload);
         } finally {
             payload.release();
             builder.recycle();