You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/28 21:15:07 UTC
[pulsar] branch master updated: Replicated subscriptions - Create
snapshots (#4354)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 834ca6a Replicated subscriptions - Create snapshots (#4354)
834ca6a is described below
commit 834ca6af60a97ea17f90614c7db6a5b45ed2ad31
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 28 14:15:01 2019 -0700
Replicated subscriptions - Create snapshots (#4354)
* Replicated subscriptions - Create snapshots
* Added clock and unit tests for snapshot builder
---
conf/broker.conf | 9 +
.../apache/pulsar/broker/ServiceConfiguration.java | 15 ++
.../apache/pulsar/broker/service/Replicator.java | 2 +
.../nonpersistent/NonPersistentReplicator.java | 6 +
.../service/persistent/PersistentReplicator.java | 46 ++++-
.../broker/service/persistent/PersistentTopic.java | 58 +++++-
.../ReplicatedSubscriptionsController.java | 201 ++++++++++++++++++
.../ReplicatedSubscriptionsSnapshotBuilder.java | 137 +++++++++++++
...ReplicatedSubscriptionsSnapshotBuilderTest.java | 228 +++++++++++++++++++++
.../org/apache/pulsar/common/api/Commands.java | 8 +
10 files changed, 703 insertions(+), 7 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index f6c8221..ae95d48 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -259,6 +259,15 @@ maxMessageSize=5242880
# Interval between checks to see if topics with compaction policies need to be compacted
brokerServiceCompactionMonitorIntervalInSeconds=60
+# Enable tracking of replicated subscriptions state across clusters.
+enableReplicatedSubscriptions=true
+
+# Frequency of snapshots for replicated subscriptions tracking.
+replicatedSubscriptionsSnapshotFrequencyMillis=1000
+
+# Timeout for building a consistent snapshot for tracking replicated subscriptions state.
+replicatedSubscriptionsSnapshotTimeoutSeconds=30
+
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 984adf1..91ef0da 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -510,6 +510,21 @@ public class ServiceConfiguration implements PulsarConfiguration {
maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable tracking of replicated subscriptions state across clusters.")
+ private boolean enableReplicatedSubscriptions = true;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Frequency of snapshots for replicated subscriptions tracking.")
+ private int replicatedSubscriptionsSnapshotFrequencyMillis = 1_000;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
+ private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;
+
/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 7412b22..40795a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -46,4 +46,6 @@ public interface Replicator {
default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}
+
+ boolean isConnected();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index d505c44..10e16ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -250,4 +250,10 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
protected void disableReplicatorRead() {
// No-op
}
+
+ @Override
+ public boolean isConnected() {
+ ProducerImpl<?> producer = this.producer;
+ return producer != null && producer.isConnected();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2253104..01c547e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.broker.service.persistent;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import java.util.List;
import java.util.Optional;
@@ -50,15 +53,14 @@ import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback {
private final PersistentTopic topic;
@@ -280,6 +282,8 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
continue;
}
+ checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
+
if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
@@ -650,5 +654,39 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
}
}
+ private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
+ if (!msg.getMessageBuilder().hasMarkerType()) {
+ // No marker is defined
+ return;
+ }
+
+ int markerType = msg.getMessageBuilder().getMarkerType();
+
+ if (!remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom())) {
+ // Only consider markers that are coming from the same cluster that this
+ // replicator instance is assigned to.
+ // All the replicators will see all the markers, but we need to only process
+ // it once.
+ return;
+ }
+
+ switch (markerType) {
+ case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
+ case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
+ case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
+ topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
+ break;
+
+ default:
+ // Do nothing
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ ProducerImpl<?> producer = this.producer;
+ return producer != null && producer.isConnected();
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 907044d..22650d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -192,6 +192,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
private volatile boolean schemaValidationEnforced = false;
private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
+
+ private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();
+
private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@Override
protected TopicStatsHelper initialValue() {
@@ -276,6 +279,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
+
+ checkReplicatedSubscriptionControllerState();
}
private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
@@ -597,6 +602,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
+ checkReplicatedSubscriptionControllerState();
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
}
@@ -914,6 +920,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic);
+ ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.get();
+ if (ctrl != null) {
+ ctrl.close();
+ }
+
log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
}
@@ -1418,7 +1429,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
topicStatsStream.writePair("storageSize", ledger.getEstimatedBacklogSize());
topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
-
+
nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
@@ -1433,7 +1444,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// Close topic object
topicStatsStream.endObject();
-
+
// add publish-latency metrics
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
@@ -1488,7 +1499,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
stats.storageSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
-
+
return stats;
}
@@ -1992,8 +2003,49 @@ public class PersistentTopic implements Topic, AddEntryCallback {
});
}
+
@Override
public void recordAddLatency(long latencyUSec) {
addEntryLatencyStatsUsec.addValue(latencyUSec);
}
+
+ private synchronized void checkReplicatedSubscriptionControllerState() {
+ AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
+ subscriptions.forEach((name, subscription) -> {
+ if (subscription.isReplicated()) {
+ shouldBeEnabled.set(true);
+ }
+ });
+
+ if (shouldBeEnabled.get() == false) {
+ log.info("[{}] There are no replicated subscriptions on the topic", topic);
+ }
+
+ checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
+ }
+
+ private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
+ boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
+
+ if (shouldBeEnabled && !isCurrentlyEnabled) {
+ log.info("[{}] Enabling replicated subscriptions controller", topic);
+ replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
+ brokerService.pulsar().getConfiguration().getClusterName()));
+ } else if (isCurrentlyEnabled && !shouldBeEnabled) {
+ log.info("[{}] Disabled replicated subscriptions controller", topic);
+ replicatedSubscriptionsController.get().close();
+ replicatedSubscriptionsController = Optional.empty();
+ }
+ }
+
+ void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
+ ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.orElse(null);
+ if (ctrl == null) {
+ // Force to start the replication controller
+ checkReplicatedSubscriptionControllerState(true /* shouldBeEnabled */);
+ ctrl = replicatedSubscriptionsController.get();
+ }
+
+ ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
new file mode 100644
index 0000000..3f14246
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import io.netty.buffer.ByteBuf;
+import io.prometheus.client.Gauge;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsUpdate;
+
+/**
+ * Encapsulate all the logic of replicated subscriptions tracking for a given topic.
+ */
+@Slf4j
+public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
+ private final PersistentTopic topic;
+ private final String localCluster;
+
+ private final ScheduledFuture<?> timer;
+
+ private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots = new ConcurrentHashMap<>();
+
+ private final static Gauge pendingSnapshotsMetric = Gauge
+ .build("pulsar_replicated_subscriptions_pending_snapshots",
+ "Counter of currently pending snapshots")
+ .register();
+
+ public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
+ this.topic = topic;
+ this.localCluster = localCluster;
+ timer = topic.getBrokerService().pulsar().getExecutor()
+ .scheduleAtFixedRate(this::startNewSnapshot, 0,
+ topic.getBrokerService().pulsar().getConfiguration()
+ .getReplicatedSubscriptionsSnapshotFrequencyMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
+ try {
+ switch (markerType) {
+ case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
+ receivedSnapshotRequest(Markers.parseReplicatedSubscriptionsSnapshotRequest(payload));
+ break;
+
+ case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
+ receivedSnapshotResponse(position, Markers.parseReplicatedSubscriptionsSnapshotResponse(payload));
+ break;
+
+ case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
+ receiveSubscriptionUpdated(Markers.parseReplicatedSubscriptionsUpdate(payload));
+ break;
+
+ default:
+ // Ignore
+ }
+
+ } catch (IOException e) {
+ log.warn("[{}] Failed to parse marker: {}", topic.getName(), e);
+ }
+ }
+
+ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
+ // Send response containing the current last written message id. The response
+ // marker we're publishing locally and then replicating will have a higher
+ // message id.
+ PositionImpl lastMsgId = (PositionImpl) topic.getLastMessageId();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
+ }
+
+ ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
+ request.getSnapshotId(),
+ request.getSourceCluster(),
+ localCluster,
+ lastMsgId.getLedgerId(), lastMsgId.getEntryId());
+
+ topic.publishMessage(marker, this);
+ }
+
+ private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
+ String snapshotId = response.getSnapshotId();
+ ReplicatedSubscriptionsSnapshotBuilder builder = pendingSnapshots.get(snapshotId);
+ if (builder == null) {
+ log.info("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
+ response.getCluster().getCluster());
+ return;
+ }
+
+ builder.receivedSnapshotResponse(position, response);
+ }
+
+ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
+
+ }
+
+ private void startNewSnapshot() {
+ cleanupTimedOutSnapshots();
+
+ AtomicBoolean anyReplicatorDisconnected = new AtomicBoolean();
+ topic.getReplicators().forEach((cluster, replicator) -> {
+ if (!replicator.isConnected()) {
+ anyReplicatorDisconnected.set(true);
+ }
+ });
+
+ if (anyReplicatorDisconnected.get()) {
+ // Do not attempt to create snapshot when some of the clusters are not reachable
+ return;
+ }
+
+ pendingSnapshotsMetric.inc();
+ ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
+ topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
+ pendingSnapshots.put(builder.getSnapshotId(), builder);
+ builder.start();
+
+ }
+
+ private void cleanupTimedOutSnapshots() {
+ Iterator<Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder>> it = pendingSnapshots.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder> entry = it.next();
+ if (entry.getValue().isTimedOut()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Snapshot creation timed out for {}", topic.getName(), entry.getKey());
+ }
+
+ pendingSnapshotsMetric.dec();
+ it.remove();
+ }
+ }
+ }
+
+ void snapshotCompleted(String snapshotId) {
+ pendingSnapshots.remove(snapshotId);
+ pendingSnapshotsMetric.dec();
+ }
+
+ void writeMarker(ByteBuf marker) {
+ topic.publishMessage(marker, this);
+ }
+
+ /**
+ * From Topic.PublishContext
+ */
+ @Override
+ public void completed(Exception e, long ledgerId, long entryId) {
+ // Nothing to do in case of publish errors since the retry logic is applied upstream after a snapshot is not
+ // closed
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+ }
+ }
+
+ PersistentTopic topic() {
+ return topic;
+ }
+
+ String localCluster() {
+ return localCluster;
+ }
+
+ @Override
+ public void close() {
+ timer.cancel(true);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
new file mode 100644
index 0000000..753c777
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import io.prometheus.client.Summary;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
+
+@Slf4j
+public class ReplicatedSubscriptionsSnapshotBuilder {
+
+ private final String snapshotId;
+ private final ReplicatedSubscriptionsController controller;
+
+ private final Map<String, MessageIdData> responses = new TreeMap<>();
+ private final List<String> remoteClusters;
+ private final Set<String> missingClusters;
+
+ private final boolean needTwoRounds;
+ private boolean firstRoundComplete;
+
+ private long startTimeMillis;
+ private final long timeoutMillis;
+
+ private final Clock clock;
+
+ private final static Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
+ "Time taken to create a consistent snapshot across clusters").register();
+
+ public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller,
+ List<String> remoteClusters, ServiceConfiguration conf, Clock clock) {
+ this.snapshotId = UUID.randomUUID().toString();
+ this.controller = controller;
+ this.remoteClusters = remoteClusters;
+ this.missingClusters = new TreeSet<>(remoteClusters);
+ this.clock = clock;
+ this.timeoutMillis = TimeUnit.SECONDS.toMillis(conf.getReplicatedSubscriptionsSnapshotTimeoutSeconds());
+
+ // If we have more than 2 cluster, we need to do 2 rounds of snapshots, to make sure
+ // we're catching all the messages eventually exchanged between the two.
+ this.needTwoRounds = remoteClusters.size() > 1;
+ }
+
+ String getSnapshotId() {
+ return snapshotId;
+ }
+
+ void start() {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Starting new snapshot {} - Clusters: {}", controller.topic().getName(), snapshotId,
+ missingClusters);
+ }
+ startTimeMillis = clock.millis();
+ controller.writeMarker(
+ Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
+ }
+
+ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received response from {}", controller.topic().getName(),
+ response.getCluster().getCluster());
+ }
+ String cluster = response.getCluster().getCluster();
+ responses.putIfAbsent(cluster, response.getCluster().getMessageId());
+ missingClusters.remove(cluster);
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Missing clusters {}", controller.topic().getName(), missingClusters);
+ }
+
+ if (!missingClusters.isEmpty()) {
+ // We're still waiting for more responses to come back
+ return;
+ }
+
+ // We have now received all responses
+
+ if (needTwoRounds && !firstRoundComplete) {
+ // Mark that 1st round is done and start a 2nd round
+ firstRoundComplete = true;
+ missingClusters.addAll(remoteClusters);
+
+ controller.writeMarker(
+ Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Snapshot is complete {}", controller.topic().getName(), snapshotId);
+ }
+ // Snapshot is now complete, store it in the local topic
+ PositionImpl p = (PositionImpl) position;
+ controller.writeMarker(
+ Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
+ p.getLedgerId(), p.getEntryId(), responses));
+ controller.snapshotCompleted(snapshotId);
+
+ double latencyMillis = clock.millis() - startTimeMillis;
+ snapshotMetric.observe(latencyMillis);
+ }
+
+ boolean isTimedOut() {
+ return (startTimeMillis + timeoutMillis) < clock.millis();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
new file mode 100644
index 0000000..95b984f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.Markers;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ReplicatedSubscriptionsSnapshotBuilderTest {
+
+ private final String localCluster = "a";
+ private long currentTime = 0;
+ private Clock clock;
+ private ServiceConfiguration conf;
+ private ReplicatedSubscriptionsController controller;
+ private List<ByteBuf> markers;
+
+ @BeforeMethod
+ public void setup() {
+ clock = mock(Clock.class);
+ when(clock.millis()).thenAnswer(invocation -> currentTime);
+
+ conf = new ServiceConfiguration();
+ conf.setReplicatedSubscriptionsSnapshotTimeoutSeconds(3);
+
+ markers = new ArrayList<>();
+
+ controller = mock(ReplicatedSubscriptionsController.class);
+ when(controller.localCluster()).thenReturn(localCluster);
+ doAnswer(invocation -> {
+ ByteBuf marker = invocation.getArgumentAt(0, ByteBuf.class);
+ Commands.skipMessageMetadata(marker);
+ markers.add(marker);
+ return null;
+ }).when(controller)
+ .writeMarker(any(ByteBuf.class));
+ }
+
+ @Test
+ public void testBuildSnapshotWith2Clusters() throws Exception {
+ List<String> remoteClusters = Arrays.asList("b");
+
+ ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller,
+ remoteClusters, conf, clock);
+
+ assertTrue(markers.isEmpty());
+
+ builder.start();
+
+ // Should have sent out a marker to initiate the snapshot
+ assertEquals(markers.size(), 1);
+ ReplicatedSubscriptionsSnapshotRequest request = Markers
+ .parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+ assertEquals(request.getSourceCluster(), localCluster);
+
+ // Simulate the responses coming back
+ builder.receivedSnapshotResponse(new PositionImpl(1, 1),
+ ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setCluster(ClusterMessageId.newBuilder()
+ .setCluster("b")
+ .setMessageId(MessageIdData.newBuilder()
+ .setLedgerId(11)
+ .setEntryId(11)
+ .build()))
+ .build());
+
+ // At this point the snapshot should be created
+ assertEquals(markers.size(), 1);
+ ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+ assertEquals(snapshot.getClustersCount(), 1);
+ assertEquals(snapshot.getClusters(0).getCluster(), "b");
+ assertEquals(snapshot.getClusters(0).getMessageId().getLedgerId(), 11);
+ assertEquals(snapshot.getClusters(0).getMessageId().getEntryId(), 11);
+
+ assertEquals(snapshot.getLocalMessageId().getLedgerId(), 1);
+ assertEquals(snapshot.getLocalMessageId().getEntryId(), 1);
+ }
+
+ @Test
+ public void testBuildSnapshotWith3Clusters() throws Exception {
+ List<String> remoteClusters = Arrays.asList("b", "c");
+
+ ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller,
+ remoteClusters, conf, clock);
+
+ assertTrue(markers.isEmpty());
+
+ builder.start();
+
+ // Should have sent out a marker to initiate the snapshot
+ assertEquals(markers.size(), 1);
+ ReplicatedSubscriptionsSnapshotRequest request = Markers
+ .parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+ assertEquals(request.getSourceCluster(), localCluster);
+
+ // Simulate the responses coming back
+ builder.receivedSnapshotResponse(new PositionImpl(1, 1),
+ ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setCluster(ClusterMessageId.newBuilder()
+ .setCluster("b")
+ .setMessageId(MessageIdData.newBuilder()
+ .setLedgerId(11)
+ .setEntryId(11)
+ .build()))
+ .build());
+
+ // No markers should be sent out
+ assertTrue(markers.isEmpty());
+
+ builder.receivedSnapshotResponse(new PositionImpl(2, 2),
+ ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setCluster(ClusterMessageId.newBuilder()
+ .setCluster("c")
+ .setMessageId(MessageIdData.newBuilder()
+ .setLedgerId(22)
+ .setEntryId(22)
+ .build()))
+ .build());
+
+ // Since we have 2 remote clusters, a 2nd round of snapshot will be taken
+ assertEquals(markers.size(), 1);
+ request = Markers.parseReplicatedSubscriptionsSnapshotRequest(markers.remove(0));
+ assertEquals(request.getSourceCluster(), localCluster);
+
+ // Responses coming back
+ builder.receivedSnapshotResponse(new PositionImpl(3, 3),
+ ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setCluster(ClusterMessageId.newBuilder()
+ .setCluster("b")
+ .setMessageId(MessageIdData.newBuilder()
+ .setLedgerId(33)
+ .setEntryId(33)
+ .build()))
+ .build());
+
+ // No markers should be sent out
+ assertTrue(markers.isEmpty());
+
+ builder.receivedSnapshotResponse(new PositionImpl(4, 4),
+ ReplicatedSubscriptionsSnapshotResponse.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setCluster(ClusterMessageId.newBuilder()
+ .setCluster("c")
+ .setMessageId(MessageIdData.newBuilder()
+ .setLedgerId(44)
+ .setEntryId(44)
+ .build()))
+ .build());
+
+ // At this point the snapshot should be created
+ assertEquals(markers.size(), 1);
+ ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(markers.remove(0));
+ assertEquals(snapshot.getClustersCount(), 2);
+ assertEquals(snapshot.getClusters(0).getCluster(), "b");
+ assertEquals(snapshot.getClusters(0).getMessageId().getLedgerId(), 11);
+ assertEquals(snapshot.getClusters(0).getMessageId().getEntryId(), 11);
+
+ assertEquals(snapshot.getClusters(1).getCluster(), "c");
+ assertEquals(snapshot.getClusters(1).getMessageId().getLedgerId(), 22);
+ assertEquals(snapshot.getClusters(1).getMessageId().getEntryId(), 22);
+
+ assertEquals(snapshot.getLocalMessageId().getLedgerId(), 4);
+ assertEquals(snapshot.getLocalMessageId().getEntryId(), 4);
+ }
+
+ @Test
+ public void testBuildTimeout() throws Exception {
+ List<String> remoteClusters = Arrays.asList("b");
+
+ ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller,
+ remoteClusters, conf, clock);
+
+ assertFalse(builder.isTimedOut());
+
+ builder.start();
+
+ currentTime = 2000;
+
+ assertFalse(builder.isTimedOut());
+
+ currentTime = 5000;
+
+ assertTrue(builder.isTimedOut());
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index d1d6cf8..c1c9752 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -384,6 +384,14 @@ public class Commands {
}
}
+ public static void skipMessageMetadata(ByteBuf buffer) {
+ // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse
+ // metadata
+ skipChecksumIfPresent(buffer);
+ int metadataSize = (int) buffer.readUnsignedInt();
+ buffer.skipBytes(metadataSize);
+ }
+
public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload) {
CommandMessage.Builder msgBuilder = CommandMessage.newBuilder();
msgBuilder.setConsumerId(consumerId);