You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/17 00:09:15 UTC
[pulsar] branch master updated: Replicated subscriptions - Advance
remote cursor when local consumers moves ahead (#4396)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7be1ee1 Replicated subscriptions - Advance remote cursor when local consumers moves ahead (#4396)
7be1ee1 is described below
commit 7be1ee1fdb59421ac858b38840d3baf8c9073a5c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Jun 16 17:09:08 2019 -0700
Replicated subscriptions - Advance remote cursor when local consumers moves ahead (#4396)
### Motivation
This is the 4th and last (implementation) change for pip-33.
It includes reading and caching the last N snapshots and sending the updates to the other clusters.
Previous PRs:
1. #4299
2. #4340
3. #4354
---
conf/broker.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 7 +-
.../broker/service/AbstractBaseDispatcher.java | 27 ++++-
.../apache/pulsar/broker/service/Subscription.java | 5 +
.../service/persistent/PersistentSubscription.java | 40 ++++++--
.../broker/service/persistent/PersistentTopic.java | 4 +
.../ReplicatedSubscriptionSnapshotCache.java | 92 +++++++++++++++++
.../ReplicatedSubscriptionsController.java | 63 +++++++++++-
.../ReplicatedSubscriptionSnapshotCacheTest.java | 114 +++++++++++++++++++++
.../org/apache/pulsar/common/protocol/Markers.java | 8 +-
10 files changed, 350 insertions(+), 13 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 05866f8..022085a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -278,6 +278,9 @@ replicatedSubscriptionsSnapshotFrequencyMillis=1000
# Timeout for building a consistent snapshot for tracking replicated subscriptions state.
replicatedSubscriptionsSnapshotTimeoutSeconds=30
+# Max number of snapshot to be cached per subscription.
+replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
+
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 7b15ab1..840117d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -536,6 +536,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of snapshot to be cached per subscription.")
+ private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
+
/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
@@ -905,7 +910,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
- doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
+ doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
)
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 896d270..da06bd2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -24,13 +24,17 @@ import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+@Slf4j
public abstract class AbstractBaseDispatcher implements Dispatcher {
protected final Subscription subscription;
@@ -61,20 +65,26 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
* @param subscription
* the subscription object
*/
- public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo) {
+ public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
+ SendMessageInfo sendMessageInfo) {
int totalMessages = 0;
long totalBytes = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
ByteBuf metadataAndPayload = entry.getDataBuffer();
- PositionImpl pos = (PositionImpl) entry.getPosition();
MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
try {
if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
+ PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
+
+ if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
+ processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
+ }
+
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
@@ -100,4 +110,17 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
}
+
+ private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
+ // Remove the protobuf headers
+ Commands.skipMessageMetadata(headersAndPayload);
+
+ try {
+ ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
+ subscription.processReplicatedSubscriptionSnapshot(snapshot);
+ } catch (Throwable t) {
+ log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
+ return;
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 4851b5b..c044fa5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
public interface Subscription {
@@ -90,6 +91,10 @@ public interface Subscription {
void addUnAckedMessages(int unAckMessages);
+ default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
+ // Default is no-op
+ }
+
// Subscription utils
static boolean isCumulativeAckMode(SubType subType) {
return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 42a1479..70be6ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -84,7 +85,7 @@ public class PersistentSubscription implements Subscription {
private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
- private volatile boolean isReplicated;
+ private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
@@ -105,7 +106,7 @@ public class PersistentSubscription implements Subscription {
this.topicName = topic.getName();
this.subName = subscriptionName;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
- this.isReplicated = replicated;
+ this.setReplicated(replicated);
IS_FENCED_UPDATER.set(this, FALSE);
}
@@ -121,11 +122,15 @@ public class PersistentSubscription implements Subscription {
@Override
public boolean isReplicated() {
- return isReplicated;
+ return replicatedSubscriptionSnapshotCache != null;
}
void setReplicated(boolean replicated) {
- this.isReplicated = replicated;
+ this.replicatedSubscriptionSnapshotCache = replicated
+ ? new ReplicatedSubscriptionSnapshotCache(subName,
+ topic.getBrokerService().pulsar().getConfiguration()
+ .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription())
+ : null;
}
@Override
@@ -217,6 +222,8 @@ public class PersistentSubscription implements Subscription {
@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties) {
+ Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
+
if (ackType == AckType.Cumulative) {
if (positions.size() != 1) {
log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids", topicName, subName);
@@ -236,6 +243,19 @@ public class PersistentSubscription implements Subscription {
dispatcher.getRedeliveryTracker().removeBatch(positions);
}
+ if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
+ // Mark delete position advance
+ ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
+ if (snapshotCache != null) {
+ ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
+ .advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
+ if (snapshot != null) {
+ topic.getReplicatedSubscriptionController()
+ .ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
+ }
+ }
+ }
+
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
// Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
@@ -680,7 +700,7 @@ public class PersistentSubscription implements Subscription {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
- subStats.isReplicated = isReplicated;
+ subStats.isReplicated = isReplicated();
return subStats;
}
@@ -728,7 +748,7 @@ public class PersistentSubscription implements Subscription {
* (eg. when using compaction subscription) and the subscription properties.
*/
protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
- Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
+ Map<String, Long> baseProperties = isReplicated() ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
: NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
if (userProperties.isEmpty()) {
@@ -743,5 +763,13 @@ public class PersistentSubscription implements Subscription {
}
+ @Override
+ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
+ ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
+ if (snapshotCache != null) {
+ snapshotCache.addNewSnapshot(snapshot);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0f132c0..9202ac8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1892,6 +1892,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
+ }
+
+ Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController() {
+ return replicatedSubscriptionsController;
}
public CompactedTopic getCompactedTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
new file mode 100644
index 0000000..185e742
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
+
+/**
+ * Store the last N snapshots that were scanned by a particular subscription
+ */
+@Slf4j
+public class ReplicatedSubscriptionSnapshotCache {
+ private final String subscription;
+ private final NavigableMap<PositionImpl, ReplicatedSubscriptionsSnapshot> snapshots;
+ private final int maxSnapshotToCache;
+
+ public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) {
+ this.subscription = subscription;
+ this.snapshots = new TreeMap<>();
+ this.maxSnapshotToCache = maxSnapshotToCache;
+ }
+
+ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
+ MessageIdData msgId = snapshot.getLocalMessageId();
+ PositionImpl position = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position,
+ snapshot.getSnapshotId());
+ }
+
+ snapshots.put(position, snapshot);
+
+ // Prune the cache
+ while (snapshots.size() > maxSnapshotToCache) {
+ snapshots.pollFirstEntry();
+ }
+ }
+
+ /**
+ * Signal that the mark-delete position on the subscription has been advanced. If there is a snapshot that
+ * correspond to this position, it will returned, other it will return null.
+ */
+ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(PositionImpl pos) {
+ ReplicatedSubscriptionsSnapshot snapshot = null;
+ while (!snapshots.isEmpty()) {
+ PositionImpl first = snapshots.firstKey();
+ if (first.compareTo(pos) > 0) {
+ // Snapshot is associated which an higher position, so it cannot be used now
+ break;
+ } else {
+ // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we
+ // can use
+ snapshot = snapshots.pollFirstEntry().getValue();
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ if (snapshot != null) {
+ log.debug("[{}] Advanced mark-delete position to {} -- found snapshot {} at {}:{}", subscription, pos,
+ snapshot.getSnapshotId(),
+ snapshot.getLocalMessageId().getLedgerId(),
+ snapshot.getLocalMessageId().getEntryId());
+ } else {
+ log.debug("[{}] Advanced mark-delete position to {} -- snapshot not found", subscription, pos);
+ }
+ }
+ return snapshot;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 232cf59..8b64c14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -23,24 +23,32 @@ import io.prometheus.client.Gauge;
import java.io.IOException;
import java.time.Clock;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId;
import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsUpdate;
+import org.apache.pulsar.common.protocol.Markers;
/**
* Encapsulate all the logic of replicated subscriptions tracking for a given topic.
@@ -93,6 +101,25 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
}
}
+ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
+ snapshot.getClustersList().stream()
+ .map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
+ cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
+ .collect(Collectors.toList()));
+ }
+
+ Map<String, MessageIdData> clusterIds = new TreeMap<>();
+ for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
+ ClusterMessageId cmid = snapshot.getClusters(i);
+ clusterIds.put(cmid.getCluster(), cmid.getMessageId());
+ }
+
+ ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
+ topic.publishMessage(subscriptionUpdate, this);
+ }
+
private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
// Send response containing the current last written message id. The response
// marker we're publishing locally and then replicating will have a higher
@@ -115,8 +142,10 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
String snapshotId = response.getSnapshotId();
ReplicatedSubscriptionsSnapshotBuilder builder = pendingSnapshots.get(snapshotId);
if (builder == null) {
- log.info("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
- response.getCluster().getCluster());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
+ response.getCluster().getCluster());
+ }
return;
}
@@ -124,7 +153,35 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
}
private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
+ MessageIdData updatedMessageId = null;
+ for (int i = 0, size = update.getClustersCount(); i < size; i++) {
+ ClusterMessageId cmid = update.getClusters(i);
+ if (localCluster.equals(cmid.getCluster())) {
+ updatedMessageId = cmid.getMessageId();
+ }
+ }
+
+ if (updatedMessageId == null) {
+ // No updates for this cluster, ignore
+ return;
+ }
+ Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Received update for subscription to {}", topic, update.getSubscriptionName(), pos);
+ }
+
+ PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
+ if (sub != null) {
+ sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
+ } else {
+ // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
+ log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
+ topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
+ topic.createSubscription(update.getSubscriptionName(),
+ InitialPosition.Latest, true /* replicateSubscriptionState */);
+ }
}
private void startNewSnapshot() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
new file mode 100644
index 0000000..a103945
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
+import org.testng.annotations.Test;
+
+public class ReplicatedSubscriptionSnapshotCacheTest {
+ @Test
+ public void testSnashotCache() {
+ ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 10);
+
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(0, 0)));
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(100, 0)));
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setLocalMessageId(newMessageId(1, 1))
+ .build());
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-2")
+ .setLocalMessageId(newMessageId(2, 2))
+ .build());
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-5")
+ .setLocalMessageId(newMessageId(5, 5))
+ .build());
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-7")
+ .setLocalMessageId(newMessageId(7, 7))
+ .build());
+
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(0, 0)));
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(1, 0)));
+ ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(new PositionImpl(1, 1));
+ assertNotNull(snapshot);
+ assertEquals(snapshot.getSnapshotId(), "snapshot-1");
+
+ snapshot = cache.advancedMarkDeletePosition(new PositionImpl(5, 6));
+ assertNotNull(snapshot);
+ assertEquals(snapshot.getSnapshotId(), "snapshot-5");
+
+ // Snapshots should have been now removed
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(2, 2)));
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(5, 5)));
+ }
+
+ @Test
+ public void testSnashotCachePruning() {
+ ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 3);
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-1")
+ .setLocalMessageId(newMessageId(1, 1))
+ .build());
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-2")
+ .setLocalMessageId(newMessageId(2, 2))
+ .build());
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-3")
+ .setLocalMessageId(newMessageId(3, 3))
+ .build());
+
+ cache.addNewSnapshot(ReplicatedSubscriptionsSnapshot.newBuilder()
+ .setSnapshotId("snapshot-4")
+ .setLocalMessageId(newMessageId(4, 4))
+ .build());
+
+ // Snapshot-1 was already pruned
+ assertNull(cache.advancedMarkDeletePosition(new PositionImpl(1, 1)));
+ ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(new PositionImpl(2, 2));
+ assertNotNull(snapshot);
+ assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+
+ snapshot = cache.advancedMarkDeletePosition(new PositionImpl(5, 5));
+ assertNotNull(snapshot);
+ assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+ }
+
+ private MessageIdData newMessageId(long ledgerId, long entryId) {
+ return MessageIdData.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .build();
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 4884348..6d495be 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -69,6 +69,12 @@ public class Markers {
return msgMetadata.hasMarkerType();
}
+ public static boolean isReplicatedSubscriptionSnapshotMarker(MessageMetadata msgMetadata) {
+ return msgMetadata != null
+ && msgMetadata.hasMarkerType()
+ && msgMetadata.getMarkerType() == MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_VALUE;
+ }
+
@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsSnapshotRequest(String snapshotId, String sourceCluster) {
ReplicatedSubscriptionsSnapshotRequest.Builder builder = ReplicatedSubscriptionsSnapshotRequest.newBuilder();
@@ -226,7 +232,7 @@ public class Markers {
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(payload);
try {
update.writeTo(outStream);
- return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, Optional.empty(), payload);
+ return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_UPDATE, Optional.empty(), payload);
} finally {
payload.release();
builder.recycle();