You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/28 21:15:07 UTC

[pulsar] branch master updated: Replicated subscriptions - Create snapshots (#4354)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 834ca6a  Replicated subscriptions - Create snapshots (#4354)
834ca6a is described below

commit 834ca6af60a97ea17f90614c7db6a5b45ed2ad31
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 28 14:15:01 2019 -0700

    Replicated subscriptions - Create snapshots (#4354)
    
    * Replicated subscriptions - Create snapshots
    
    * Added clock and unit tests for snapshot builder
---
 conf/broker.conf                                   |   9 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  15 ++
 .../apache/pulsar/broker/service/Replicator.java   |   2 +
 .../nonpersistent/NonPersistentReplicator.java     |   6 +
 .../service/persistent/PersistentReplicator.java   |  46 ++++-
 .../broker/service/persistent/PersistentTopic.java |  58 +++++-
 .../ReplicatedSubscriptionsController.java         | 201 ++++++++++++++++++
 .../ReplicatedSubscriptionsSnapshotBuilder.java    | 137 +++++++++++++
 ...ReplicatedSubscriptionsSnapshotBuilderTest.java | 228 +++++++++++++++++++++
 .../org/apache/pulsar/common/api/Commands.java     |   8 +
 10 files changed, 703 insertions(+), 7 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f6c8221..ae95d48 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -259,6 +259,15 @@ maxMessageSize=5242880
 # Interval between checks to see if topics with compaction policies need to be compacted
 brokerServiceCompactionMonitorIntervalInSeconds=60
 
+# Enable tracking of replicated subscriptions state across clusters.
+enableReplicatedSubscriptions=true
+
+# Frequency of snapshots for replicated subscriptions tracking.
+replicatedSubscriptionsSnapshotFrequencyMillis=1000
+
+# Timeout for building a consistent snapshot for tracking replicated subscriptions state.
+replicatedSubscriptionsSnapshotTimeoutSeconds=30
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 984adf1..91ef0da 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -510,6 +510,21 @@ public class ServiceConfiguration implements PulsarConfiguration {
         maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
     private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+        doc = "Enable tracking of replicated subscriptions state across clusters.")
+    private boolean enableReplicatedSubscriptions = true;
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Frequency of snapshots for replicated subscriptions tracking.")
+    private int replicatedSubscriptionsSnapshotFrequencyMillis = 1_000;
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
+    private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;
+
     /***** --- TLS --- ****/
     @FieldContext(
         category = CATEGORY_TLS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 7412b22..40795a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -46,4 +46,6 @@ public interface Replicator {
     default Optional<DispatchRateLimiter> getRateLimiter() {
         return Optional.empty();
     }
+
+    boolean isConnected();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index d505c44..10e16ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -250,4 +250,10 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
     protected void disableReplicatorRead() {
         // No-op
     }
+
+    @Override
+    public boolean isConnected() {
+        ProducerImpl<?> producer = this.producer;
+        return producer != null && producer.isConnected();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2253104..01c547e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 
 import java.util.List;
 import java.util.Optional;
@@ -50,15 +53,14 @@ import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
 public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {
 
     private final PersistentTopic topic;
@@ -280,6 +282,8 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
                     continue;
                 }
 
+                checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
+
                 if (msg.isReplicated()) {
                     // Discard messages that were already replicated into this region
                     cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
@@ -650,5 +654,39 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         }
     }
 
+    private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
+        if (!msg.getMessageBuilder().hasMarkerType()) {
+            // No marker is defined
+            return;
+        }
+
+        int markerType = msg.getMessageBuilder().getMarkerType();
+
+        if (!remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom())) {
+            // Only consider markers that are coming from the same cluster that this
+            // replicator instance is assigned to.
+            // All the replicators will see all the markers, but we need to only process
+            // it once.
+            return;
+        }
+
+        switch (markerType) {
+        case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
+        case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
+        case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
+            topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
+            break;
+
+        default:
+            // Do nothing
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        ProducerImpl<?> producer = this.producer;
+        return producer != null && producer.isConnected();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 907044d..22650d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -192,6 +192,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     private volatile boolean schemaValidationEnforced = false;
     private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
 
+
+    private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();
+
     private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
         @Override
         protected TopicStatsHelper initialValue() {
@@ -276,6 +279,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
             log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
             isEncryptionRequired = false;
         }
+
+        checkReplicatedSubscriptionControllerState();
     }
 
     private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
@@ -597,6 +602,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                     future.completeExceptionally(
                             new BrokerServiceException("Connection was closed while the opening the cursor "));
                 } else {
+                    checkReplicatedSubscriptionControllerState();
                     log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
                     future.complete(consumer);
                 }
@@ -914,6 +920,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                     // Everything is now closed, remove the topic from map
                     brokerService.removeTopicFromCache(topic);
 
+                    ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.get();
+                    if (ctrl != null) {
+                        ctrl.close();
+                    }
+
                     log.info("[{}] Topic closed", topic);
                     closeFuture.complete(null);
                 }
@@ -1418,7 +1429,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
         topicStatsStream.writePair("storageSize", ledger.getEstimatedBacklogSize());
         topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
-        
+
         nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
         nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
         nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
@@ -1433,7 +1444,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
         // Close topic object
         topicStatsStream.endObject();
-        
+
         // add publish-latency metrics
         this.addEntryLatencyStatsUsec.refresh();
         NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
@@ -1488,7 +1499,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
         stats.storageSize = ledger.getEstimatedBacklogSize();
         stats.deduplicationStatus = messageDeduplication.getStatus().toString();
-        
+
         return stats;
     }
 
@@ -1992,8 +2003,49 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 });
     }
 
+
     @Override
     public void recordAddLatency(long latencyUSec) {
         addEntryLatencyStatsUsec.addValue(latencyUSec);
     }
+
+    private synchronized void checkReplicatedSubscriptionControllerState() {
+        AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
+        subscriptions.forEach((name, subscription) -> {
+            if (subscription.isReplicated()) {
+                shouldBeEnabled.set(true);
+            }
+        });
+
+        if (shouldBeEnabled.get() == false) {
+            log.info("[{}] There are no replicated subscriptions on the topic", topic);
+        }
+
+        checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
+    }
+
+    private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
+        boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
+
+        if (shouldBeEnabled && !isCurrentlyEnabled) {
+            log.info("[{}] Enabling replicated subscriptions controller", topic);
+            replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
+                    brokerService.pulsar().getConfiguration().getClusterName()));
+        } else if (isCurrentlyEnabled && !shouldBeEnabled) {
+            log.info("[{}] Disabled replicated subscriptions controller", topic);
+            replicatedSubscriptionsController.get().close();
+            replicatedSubscriptionsController = Optional.empty();
+        }
+    }
+
+    void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
+        ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.orElse(null);
+        if (ctrl == null) {
+            // Force to start the replication controller
+            checkReplicatedSubscriptionControllerState(true /* shouldBeEnabled */);
+            ctrl = replicatedSubscriptionsController.get();
+        }
+
+        ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
+     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
new file mode 100644
index 0000000..3f14246
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import io.netty.buffer.ByteBuf;
+import io.prometheus.client.Gauge;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsUpdate;
+
+/**
+ * Encapsulate all the logic of replicated subscriptions tracking for a given topic.
+ */
+@Slf4j
+public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
+    private final PersistentTopic topic;
+    private final String localCluster;
+
+    private final ScheduledFuture<?> timer;
+
+    private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots = new ConcurrentHashMap<>();
+
+    private final static Gauge pendingSnapshotsMetric = Gauge
+            .build("pulsar_replicated_subscriptions_pending_snapshots",
+                    "Counter of currently pending snapshots")
+            .register();
+
+    public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
+        this.topic = topic;
+        this.localCluster = localCluster;
+        timer = topic.getBrokerService().pulsar().getExecutor()
+                .scheduleAtFixedRate(this::startNewSnapshot, 0,
+                        topic.getBrokerService().pulsar().getConfiguration()
+                                .getReplicatedSubscriptionsSnapshotFrequencyMillis(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    public void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
+        try {
+            switch (markerType) {
+            case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
+                receivedSnapshotRequest(Markers.parseReplicatedSubscriptionsSnapshotRequest(payload));
+                break;
+
+            case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
+                receivedSnapshotResponse(position, Markers.parseReplicatedSubscriptionsSnapshotResponse(payload));
+                break;
+
+            case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
+                receiveSubscriptionUpdated(Markers.parseReplicatedSubscriptionsUpdate(payload));
+                break;
+
+            default:
+                // Ignore
+            }
+
+        } catch (IOException e) {
+            log.warn("[{}] Failed to parse marker: {}", topic.getName(), e);
+        }
+    }
+
+    private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
+        // Send response containing the current last written message id. The response
+        // marker we're publishing locally and then replicating will have a higher
+        // message id.
+        PositionImpl lastMsgId = (PositionImpl) topic.getLastMessageId();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
+        }
+
+        ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
+                request.getSnapshotId(),
+                request.getSourceCluster(),
+                localCluster,
+                lastMsgId.getLedgerId(), lastMsgId.getEntryId());
+
+        topic.publishMessage(marker, this);
+    }
+
+    private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
+        String snapshotId = response.getSnapshotId();
+        ReplicatedSubscriptionsSnapshotBuilder builder = pendingSnapshots.get(snapshotId);
+        if (builder == null) {
+            log.info("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
+                    response.getCluster().getCluster());
+            return;
+        }
+
+        builder.receivedSnapshotResponse(position, response);
+    }
+
+    private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
+
+    }
+
+    private void startNewSnapshot() {
+        cleanupTimedOutSnapshots();
+
+        AtomicBoolean anyReplicatorDisconnected = new AtomicBoolean();
+        topic.getReplicators().forEach((cluster, replicator) -> {
+            if (!replicator.isConnected()) {
+                anyReplicatorDisconnected.set(true);
+            }
+        });
+
+        if (anyReplicatorDisconnected.get()) {
+            // Do not attempt to create snapshot when some of the clusters are not reachable
+            return;
+        }
+
+        pendingSnapshotsMetric.inc();
+        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
+                topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
+        pendingSnapshots.put(builder.getSnapshotId(), builder);
+        builder.start();
+
+    }
+
+    private void cleanupTimedOutSnapshots() {
+        Iterator<Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder>> it = pendingSnapshots.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder> entry = it.next();
+            if (entry.getValue().isTimedOut()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Snapshot creation timed out for {}", topic.getName(), entry.getKey());
+                }
+
+                pendingSnapshotsMetric.dec();
+                it.remove();
+            }
+        }
+    }
+
+    void snapshotCompleted(String snapshotId) {
+        pendingSnapshots.remove(snapshotId);
+        pendingSnapshotsMetric.dec();
+    }
+
+    void writeMarker(ByteBuf marker) {
+        topic.publishMessage(marker, this);
+    }
+
+    /**
+     * From Topic.PublishContext
+     */
+    @Override
+    public void completed(Exception e, long ledgerId, long entryId) {
+        // Nothing to do in case of publish errors since the retry logic is applied upstream after a snapshot is not
+        // closed
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+        }
+    }
+
+    PersistentTopic topic() {
+        return topic;
+    }
+
+    String localCluster() {
+        return localCluster;
+    }
+
+    @Override
+    public void close() {
+        timer.cancel(true);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
new file mode 100644
index 0000000..753c777
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import io.prometheus.client.Summary;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
+
+@Slf4j
+public class ReplicatedSubscriptionsSnapshotBuilder {
+
+    private final String snapshotId;
+    private final ReplicatedSubscriptionsController controller;
+
+    private final Map<String, MessageIdData> responses = new TreeMap<>();
+    private final List<String> remoteClusters;
+    private final Set<String> missingClusters;
+
+    private final boolean needTwoRounds;
+    private boolean firstRoundComplete;
+
+    private long startTimeMillis;
+    private final long timeoutMillis;
+
+    private final Clock clock;
+
+    private final static Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
+            "Time taken to create a consistent snapshot across clusters").register();
+
+    public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller,
+            List<String> remoteClusters, ServiceConfiguration conf, Clock clock) {
+        this.snapshotId = UUID.randomUUID().toString();
+        this.controller = controller;
+        this.remoteClusters = remoteClusters;
+        this.missingClusters = new TreeSet<>(remoteClusters);
+        this.clock = clock;
+        this.timeoutMillis = TimeUnit.SECONDS.toMillis(conf.getReplicatedSubscriptionsSnapshotTimeoutSeconds());
+
+        // If we have more than 2 cluster, we need to do 2 rounds of snapshots, to make sure
+        // we're catching all the messages eventually exchanged between the two.
+        this.needTwoRounds = remoteClusters.size() > 1;
+    }
+
+    String getSnapshotId() {
+        return snapshotId;
+    }
+
+    void start() {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Starting new snapshot {} - Clusters: {}", controller.topic().getName(), snapshotId,
+                    missingClusters);
+        }
+        startTimeMillis = clock.millis();
+        controller.writeMarker(
+                Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
+    }
+
+    synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Received response from {}", controller.topic().getName(),
+                    response.getCluster().getCluster());
+        }
+        String cluster = response.getCluster().getCluster();
+        responses.putIfAbsent(cluster, response.getCluster().getMessageId());
+        missingClusters.remove(cluster);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Missing clusters {}", controller.topic().getName(), missingClusters);
+        }
+
+        if (!missingClusters.isEmpty()) {
+            // We're still waiting for more responses to come back
+            return;
+        }
+
+        // We have now received all responses
+
+        if (needTwoRounds && !firstRoundComplete) {
+            // Mark that 1st round is done and start a 2nd round
+            firstRoundComplete = true;
+            missingClusters.addAll(remoteClusters);
+
+            controller.writeMarker(
+                    Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Snapshot is complete {}", controller.topic().getName(), snapshotId);
+        }
+        // Snapshot is now complete, store it in the local topic
+        PositionImpl p = (PositionImpl) position;
+        controller.writeMarker(
+                Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
+                        p.getLedgerId(), p.getEntryId(), responses));
+        controller.snapshotCompleted(snapshotId);
+
+        double latencyMillis = clock.millis() - startTimeMillis;
+        snapshotMetric.observe(latencyMillis);
+    }
+
+    boolean isTimedOut() {
+        return (startTimeMillis + timeoutMillis) < clock.millis();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
new file mode 100644
index 0000000..95b984f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ReplicatedSubscriptionsSnapshotBuilderTest {
+
+    private final String localCluster = "a";
+    private long currentTime = 0;
+    private Clock clock;
+    private ServiceConfiguration conf;
+    private ReplicatedSubscriptionsController controller;
+    private List<ByteBuf> markers;
+
+    @BeforeMethod
+    public void setup() {
+        clock = mock(Clock.class);
+        when(clock.millis()).thenAnswer(invocation -> currentTime);
+
+        conf = new ServiceConfiguration();
+        conf.setReplicatedSubscriptionsSnapshotTimeoutSeconds(3);
+
+        markers = new ArrayList<>();
+
+        controller = mock(ReplicatedSubscriptionsController.class);
+        when(controller.localCluster()).thenReturn(localCluster);
+        doAnswer(invocation -> {
+            ByteBuf marker = invocation.getArgumentAt(0, ByteBuf.class);
+            Commands.skipMessageMetadata(marker);
+            markers.add(marker);
+            return null;
+        }).when(controller)
+                .writeMarker(any(ByteBuf.class));
+    }
+
+    @Test
+    public void testBuildSnapshotWith2Clusters() throws Exception {
+        List<String> remoteClusters = Arrays.asList("b");
+
+        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller,
+                remoteClusters, conf, clock);
+
+        assertTrue(markers.isEmpty());
+
+        builder.start();
+
+        // Should have sent out a marker to initiate the snapshot
+        assertEquals(markers.size(), 1);
+        ReplicatedSubscriptionsSnapshotRequest request = Markers
+                .parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+        assertEquals(request.getSourceCluster(), localCluster);
+
+        // Simulate the responses coming back
+        builder.receivedSnapshotResponse(new PositionImpl(1, 1),
+                ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+                        .setSnapshotId("snapshot-1")
+                        .setCluster(ClusterMessageId.newBuilder()
+                                .setCluster("b")
+                                .setMessageId(MessageIdData.newBuilder()
+                                        .setLedgerId(11)
+                                        .setEntryId(11)
+                                        .build()))
+                        .build());
+
+        // At this point the snapshot should be created
+        assertEquals(markers.size(), 1);
+        ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+        assertEquals(snapshot.getClustersCount(), 1);
+        assertEquals(snapshot.getClusters(0).getCluster(), "b");
+        assertEquals(snapshot.getClusters(0).getMessageId().getLedgerId(), 11);
+        assertEquals(snapshot.getClusters(0).getMessageId().getEntryId(), 11);
+
+        assertEquals(snapshot.getLocalMessageId().getLedgerId(), 1);
+        assertEquals(snapshot.getLocalMessageId().getEntryId(), 1);
+    }
+
+    @Test
+    public void testBuildSnapshotWith3Clusters() throws Exception {
+        List<String> remoteClusters = Arrays.asList("b", "c");
+
+        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller,
+                remoteClusters, conf, clock);
+
+        assertTrue(markers.isEmpty());
+
+        builder.start();
+
+        // Should have sent out a marker to initiate the snapshot
+        assertEquals(markers.size(), 1);
+        ReplicatedSubscriptionsSnapshotRequest request = Markers
+                .parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+        assertEquals(request.getSourceCluster(), localCluster);
+
+        // Simulate the responses coming back
+        builder.receivedSnapshotResponse(new PositionImpl(1, 1),
+                ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+                        .setSnapshotId("snapshot-1")
+                        .setCluster(ClusterMessageId.newBuilder()
+                                .setCluster("b")
+                                .setMessageId(MessageIdData.newBuilder()
+                                        .setLedgerId(11)
+                                        .setEntryId(11)
+                                        .build()))
+                        .build());
+
+        // No markers should be sent out
+        assertTrue(markers.isEmpty());
+
+        builder.receivedSnapshotResponse(new PositionImpl(2, 2),
+                ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+                        .setSnapshotId("snapshot-1")
+                        .setCluster(ClusterMessageId.newBuilder()
+                                .setCluster("c")
+                                .setMessageId(MessageIdData.newBuilder()
+                                        .setLedgerId(22)
+                                        .setEntryId(22)
+                                        .build()))
+                        .build());
+
+        // Since we have 2 remote clusters, a 2nd round of snapshot will be taken
+        assertEquals(markers.size(), 1);
+        request = Markers.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+        assertEquals(request.getSourceCluster(), localCluster);
+
+        // Responses coming back
+        builder.receivedSnapshotResponse(new PositionImpl(3, 3),
+                ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+                        .setSnapshotId("snapshot-1")
+                        .setCluster(ClusterMessageId.newBuilder()
+                                .setCluster("b")
+                                .setMessageId(MessageIdData.newBuilder()
+                                        .setLedgerId(33)
+                                        .setEntryId(33)
+                                        .build()))
+                        .build());
+
+        // No markers should be sent out
+        assertTrue(markers.isEmpty());
+
+        builder.receivedSnapshotResponse(new PositionImpl(4, 4),
+                ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+                        .setSnapshotId("snapshot-1")
+                        .setCluster(ClusterMessageId.newBuilder()
+                                .setCluster("c")
+                                .setMessageId(MessageIdData.newBuilder()
+                                        .setLedgerId(44)
+                                        .setEntryId(44)
+                                        .build()))
+                        .build());
+
+        // At this point the snapshot should be created
+        assertEquals(markers.size(), 1);
+        ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+        assertEquals(snapshot.getClustersCount(), 2);
+        assertEquals(snapshot.getClusters(0).getCluster(), "b");
+        assertEquals(snapshot.getClusters(0).getMessageId().getLedgerId(), 11);
+        assertEquals(snapshot.getClusters(0).getMessageId().getEntryId(), 11);
+
+        assertEquals(snapshot.getClusters(1).getCluster(), "c");
+        assertEquals(snapshot.getClusters(1).getMessageId().getLedgerId(), 22);
+        assertEquals(snapshot.getClusters(1).getMessageId().getEntryId(), 22);
+
+        assertEquals(snapshot.getLocalMessageId().getLedgerId(), 4);
+        assertEquals(snapshot.getLocalMessageId().getEntryId(), 4);
+    }
+
+    @Test
+    public void testBuildTimeout() throws Exception {
+        List<String> remoteClusters = Arrays.asList("b");
+
+        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller,
+                remoteClusters, conf, clock);
+
+        assertFalse(builder.isTimedOut());
+
+        builder.start();
+
+        currentTime = 2000;
+
+        assertFalse(builder.isTimedOut());
+
+        currentTime = 5000;
+
+        assertTrue(builder.isTimedOut());
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index d1d6cf8..c1c9752 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -384,6 +384,14 @@ public class Commands {
         }
     }
 
+    public static void skipMessageMetadata(ByteBuf buffer) {
+        // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse
+        // metadata
+        skipChecksumIfPresent(buffer);
+        int metadataSize = (int) buffer.readUnsignedInt();
+        buffer.skipBytes(metadataSize);
+    }
+
     public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload) {
         CommandMessage.Builder msgBuilder = CommandMessage.newBuilder();
         msgBuilder.setConsumerId(consumerId);