You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/20 17:16:23 UTC
[pulsar] branch master updated: Replicated subscriptions -
Configuration and client changes (#4299)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6e51237 Replicated subscriptions - Configuration and client changes (#4299)
6e51237 is described below
commit 6e51237538cd5011abbbdfbe7fe9c7c21b63fbe2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 20 10:16:15 2019 -0700
Replicated subscriptions - Configuration and client changes (#4299)
* Replicated subscriptions - Configuration and client changes
* Added missing header
* Fixed mocked methods for tests
* Fixed typo
---
.../broker/admin/impl/PersistentTopicsBase.java | 4 +-
.../pulsar/broker/admin/v1/PersistentTopics.java | 6 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 8 +-
.../apache/pulsar/broker/service/ServerCnx.java | 6 +-
.../apache/pulsar/broker/service/Subscription.java | 2 +
.../org/apache/pulsar/broker/service/Topic.java | 6 +-
.../nonpersistent/NonPersistentSubscription.java | 5 +
.../service/nonpersistent/NonPersistentTopic.java | 9 +-
.../service/persistent/CompactorSubscription.java | 2 +-
.../service/persistent/PersistentSubscription.java | 63 ++++++++++-
.../broker/service/persistent/PersistentTopic.java | 37 ++++---
.../pulsar/broker/admin/PersistentTopicsTest.java | 4 +-
.../PersistentDispatcherFailoverConsumerTest.java | 4 +-
.../service/PersistentTopicConcurrentTest.java | 12 ++-
.../pulsar/broker/service/PersistentTopicTest.java | 61 +++++++----
.../pulsar/broker/service/ServerCnxTest.java | 22 ++++
.../ReplicatedSubscriptionConfigTest.java | 120 +++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 6 ++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 6 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +-
.../impl/conf/ConsumerConfigurationData.java | 2 +
.../org/apache/pulsar/common/api/Commands.java | 7 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 ++++++++++
.../common/policies/data/SubscriptionStats.java | 6 +-
pulsar-common/src/main/proto/PulsarApi.proto | 5 +
.../pulsar/testclient/PerformanceConsumer.java | 6 +-
26 files changed, 403 insertions(+), 67 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f3af7cd..2801be7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -917,7 +917,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative) {
+ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
@@ -968,7 +968,7 @@ public class PersistentTopicsBase extends AdminResource {
}
PersistentSubscription subscription = (PersistentSubscription) topic
- .createSubscription(subscriptionName, InitialPosition.Latest).get();
+ .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 09f719a..9766ec6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -292,7 +292,7 @@ public class PersistentTopics extends PersistentTopicsBase {
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}
-
+
@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@@ -397,9 +397,9 @@ public class PersistentTopics extends PersistentTopicsBase {
public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic,
@PathParam("subscriptionName") String encodedSubName,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
validateTopicName(property, cluster, namespace, topic);
- internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
+ internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ca9ff88..d62e0c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -150,7 +150,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
@@ -316,7 +316,7 @@ public class PersistentTopics extends PersistentTopicsBase {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}
-
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@@ -390,9 +390,9 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) {
validateTopicName(tenant, namespace, topic);
- internalCreateSubscription(decode(encodedSubName), messageId, authoritative);
+ internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated);
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b5b6fe9..4488613 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -611,6 +611,7 @@ public class ServerCnx extends PulsarHandler {
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
+ final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -685,7 +686,7 @@ public class ServerCnx extends PulsarHandler {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
- readCompacted, initialPosition);
+ readCompacted, initialPosition, isReplicated);
} else {
return FutureUtil.failedFuture(
new IncompatibleSchemaException(
@@ -696,7 +697,8 @@ public class ServerCnx extends PulsarHandler {
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
- startMessageId, metadata, readCompacted, initialPosition);
+ startMessageId, metadata, readCompacted, initialPosition,
+ isReplicated);
}
})
.thenAccept(consumer -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 8076bd3..cf4c862 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -44,6 +44,8 @@ public interface Subscription {
String getTopicName();
+ boolean isReplicated();
+
Dispatcher getDispatcher();
long getNumberOfEntriesInBacklog();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 12c9bdc..5c844ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -92,9 +92,11 @@ public interface Topic {
CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
- Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
+ Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+ boolean replicateSubscriptionState);
- CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
+ CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
+ boolean replicateSubscriptionState);
CompletableFuture<Void> unsubscribe(String subName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 0139d14..5c060ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -76,6 +76,11 @@ public class NonPersistentSubscription implements Subscription {
}
@Override
+ public boolean isReplicated() {
+ return false;
+ }
+
+ @Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_FENCED_UPDATER.get(this) == TRUE) {
log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 10c8324..4c3323c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -233,7 +233,7 @@ public class NonPersistentTopic implements Topic {
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
-
+
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new TopicFencedException("Topic is temporarily unavailable");
@@ -315,10 +315,11 @@ public class NonPersistentTopic implements Topic {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
- Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
+ Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+ boolean replicateSubscriptionState) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
-
+
try {
brokerService.checkTopicNsOwnership(getName());
} catch (Exception e) {
@@ -396,7 +397,7 @@ public class NonPersistentTopic implements Topic {
}
@Override
- public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
+ public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 316ebc5..0c3feb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -41,7 +41,7 @@ public class CompactorSubscription extends PersistentSubscription {
public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
String subscriptionName, ManagedCursor cursor) {
- super(topic, subscriptionName, cursor);
+ super(topic, subscriptionName, cursor, false);
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ce9d4aa..a98af61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service.persistent;
import com.google.common.base.MoreObjects;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
@@ -40,7 +41,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositio
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
@@ -76,12 +76,36 @@ public class PersistentSubscription implements Subscription {
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
- public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
+ private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
+
+ // Map of properties that is used to mark this subscription as "replicated".
+ // Since this is the only field at this point, we can just keep a static
+ // instance of the map.
+ private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>();
+ private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
+
+ private volatile boolean isReplicated;
+
+ static {
+ REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
+ }
+
+ static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
+ return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
+ }
+
+ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
+ return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
+ }
+
+ public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
+ boolean replicated) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
+ this.isReplicated = replicated;
IS_FENCED_UPDATER.set(this, FALSE);
}
@@ -96,6 +120,15 @@ public class PersistentSubscription implements Subscription {
}
@Override
+ public boolean isReplicated() {
+ return isReplicated;
+ }
+
+ void setReplicated(boolean replicated) {
+ this.isReplicated = replicated;
+ }
+
+ @Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
cursor.updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
@@ -194,7 +227,7 @@ public class PersistentSubscription implements Subscription {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
}
- cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
+ cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, position);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
@@ -647,7 +680,7 @@ public class PersistentSubscription implements Subscription {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
-
+ subStats.isReplicated = isReplicated;
return subStats;
}
@@ -681,5 +714,25 @@ public class PersistentSubscription implements Subscription {
}
}
+ /**
+ * Return a merged map that contains the cursor properties specified by used
+ * (eg. when using compaction subscription) and the subscription properties.
+ */
+ protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
+ Map<String, Long> baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
+ : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
+
+ if (userProperties.isEmpty()) {
+ // Use only the static instance in the common case
+ return baseProperties;
+ } else {
+ Map<String, Long> merged = new TreeMap<>();
+ merged.putAll(userProperties);
+ merged.putAll(baseProperties);
+ return merged;
+ }
+
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ed59c37..e21e359 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -250,7 +250,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// to take care of it
} else {
final String subscriptionName = Codec.decode(cursor.getName());
- subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor));
+ subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
+ PersistentSubscription.isCursorFromReplicatedSubscription(cursor)));
// subscription-cursor gets activated by default: deactivate as there is no active subscription right
// now
subscriptions.get(subscriptionName).deactivateCursor();
@@ -298,12 +299,13 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
}
- private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) {
+ private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
+ boolean replicated) {
checkNotNull(compactedTopic);
if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
} else {
- return new PersistentSubscription(this, subscriptionName, cursor);
+ return new PersistentSubscription(this, subscriptionName, cursor, replicated);
}
}
@@ -496,7 +498,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
- Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
+ Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+ boolean replicatedSubscriptionState) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
@@ -571,7 +574,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
- getDurableSubscription(subscriptionName, initialPosition) //
+ getDurableSubscription(subscriptionName, initialPosition, replicatedSubscriptionState) //
: getNonDurableSubscription(subscriptionName, startMessageId);
int maxUnackedMessages = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0;
@@ -617,17 +620,27 @@ public class PersistentTopic implements Topic, AddEntryCallback {
return future;
}
- private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, InitialPosition initialPosition) {
+ private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
+ InitialPosition initialPosition, boolean replicated) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
- ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, new OpenCursorCallback() {
+
+ Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
+
+ ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
}
- subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName,
- name -> createPersistentSubscription(subscriptionName, cursor)));
+ PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
+ name -> createPersistentSubscription(subscriptionName, cursor, replicated));
+
+ if (replicated && !subscription.isReplicated()) {
+ // Flip the subscription state
+ subscription.setReplicated(replicated);
+ }
+ subscriptionFuture.complete(subscription);
}
@Override
@@ -671,7 +684,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
subscriptionFuture.completeExceptionally(e);
}
- return new PersistentSubscription(this, subscriptionName, cursor);
+ return new PersistentSubscription(this, subscriptionName, cursor, false);
});
if (!subscriptionFuture.isDone()) {
@@ -686,8 +699,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@SuppressWarnings("unchecked")
@Override
- public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
- return getDurableSubscription(subscriptionName, initialPosition);
+ public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
+ return getDurableSubscription(subscriptionName, initialPosition, replicateSubscriptionState);
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index e75078f..6efe601 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -110,7 +110,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
} catch (Exception e) {
Assert.assertEquals("Topic partitions were not yet created", e.getMessage());
}
- persistentTopics.createSubscription(testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest);
+ persistentTopics.createSubscription(testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
List<String> subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true);
Assert.assertTrue(subscriptions.contains("test"));
persistentTopics.deleteSubscription(testTenant, testNamespace, testLocalTopicName, "test", true);
@@ -123,7 +123,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
public void testNonPartitionedTopics() {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
final String nonPartitionTopic = "non-partitioned-topic";
- persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest);
+ persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest, false);
try {
persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true);
} catch (RestException exc) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 24cb812..cb5a2ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -263,7 +263,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+ PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
@@ -302,7 +302,7 @@ public class PersistentDispatcherFailoverConsumerTest {
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+ PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 7c5ca20..01895a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -123,7 +123,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -181,7 +182,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -243,7 +245,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -301,7 +304,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
.setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8b3c721..93664ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -433,7 +433,8 @@ public class PersistentTopicTest {
.setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false);
try {
f1.get();
fail("should fail with exception");
@@ -452,12 +453,14 @@ public class PersistentTopicTest {
// 1. simple subscribe
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false);
f1.get();
// 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false);
try {
f2.get();
@@ -477,7 +480,7 @@ public class PersistentTopicTest {
@Test
public void testAddRemoveConsumer() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+ PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
@@ -508,8 +511,8 @@ public class PersistentTopicTest {
public void testMaxConsumersShared() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
- PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
+ PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
+ PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false);
// for count consumers on topic
ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
@@ -599,8 +602,8 @@ public class PersistentTopicTest {
public void testMaxConsumersFailover() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
- PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
+ PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
+ PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false);
// for count consumers on topic
ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
@@ -690,7 +693,7 @@ public class PersistentTopicTest {
@Test
public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+ PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
sub.addConsumer(consumer1);
@@ -744,7 +747,8 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest,
+ false /* replicated */);
f1.get();
assertTrue(topic.delete().isCompletedExceptionally());
@@ -759,7 +763,8 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -813,7 +818,8 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -900,7 +906,8 @@ public class PersistentTopicTest {
.setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
- 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+ 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
try {
f.get();
@@ -911,6 +918,7 @@ public class PersistentTopicTest {
}
}
+ @SuppressWarnings("unchecked")
void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
@@ -984,6 +992,15 @@ public class PersistentTopicTest {
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+ return null;
+ }
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+ any(OpenCursorCallback.class), anyObject());
+
// call deleteLedgerComplete on ledger asyncDelete
doAnswer(new Answer<Object>() {
@Override
@@ -1018,7 +1035,8 @@ public class PersistentTopicTest {
// 1. Subscribe with non partition topic
Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
- cmd1.getReadCompacted(), InitialPosition.Latest);
+ cmd1.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f1.get();
// 2. Subscribe with partition topic
@@ -1030,7 +1048,8 @@ public class PersistentTopicTest {
Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
- cmd2.getReadCompacted(), InitialPosition.Latest);
+ cmd2.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f2.get();
// 3. Subscribe and create second consumer
@@ -1040,7 +1059,8 @@ public class PersistentTopicTest {
Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
- cmd3.getReadCompacted(), InitialPosition.Latest);
+ cmd3.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f3.get();
assertEquals(
@@ -1061,7 +1081,8 @@ public class PersistentTopicTest {
Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
- cmd4.getReadCompacted(), InitialPosition.Latest);
+ cmd4.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f4.get();
assertEquals(
@@ -1087,7 +1108,8 @@ public class PersistentTopicTest {
Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
- cmd5.getReadCompacted(), InitialPosition.Latest);
+ cmd5.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
try {
f5.get();
@@ -1104,7 +1126,8 @@ public class PersistentTopicTest {
Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
- cmd6.getReadCompacted(), InitialPosition.Latest);
+ cmd6.getReadCompacted(), InitialPosition.Latest,
+ false /* replicated */);
f6.get();
// 7. unsubscribe exclusive sub
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 82f61c2..a43057c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -46,6 +46,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -1483,6 +1484,16 @@ public class ServerCnxTest {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Thread.sleep(300);
+ ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+ return null;
+ }
+ }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+ any(OpenCursorCallback.class), anyObject());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Thread.sleep(300);
((OpenCursorCallback) invocationOnMock.getArguments()[2])
.openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
@@ -1492,6 +1503,17 @@ public class ServerCnxTest {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Thread.sleep(300);
+ ((OpenCursorCallback) invocationOnMock.getArguments()[3])
+ .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
+ return null;
+ }
+ }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
+ any(OpenCursorCallback.class), anyObject());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
return null;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
new file mode 100644
index 0000000..8ec937d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ReplicatedSubscriptionConfigTest extends ProducerConsumerBase {
+ @Override
+ @BeforeClass
+ public void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ @AfterClass
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void createReplicatedSubscription() throws Exception {
+ String topic = "createReplicatedSubscription-" + System.nanoTime();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .replicateSubscriptionState(true)
+ .subscribe();
+
+ TopicStats stats = admin.topics().getStats(topic);
+ assertTrue(stats.subscriptions.get("sub1").isReplicated);
+
+ admin.topics().unload(topic);
+
+ // Check that subscription is still marked replicated after reloading
+ stats = admin.topics().getStats(topic);
+ assertTrue(stats.subscriptions.get("sub1").isReplicated);
+ }
+
+ @Test
+ public void upgradeToReplicatedSubscription() throws Exception {
+ String topic = "upgradeToReplicatedSubscription-" + System.nanoTime();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .replicateSubscriptionState(false)
+ .subscribe();
+
+ TopicStats stats = admin.topics().getStats(topic);
+ assertFalse(stats.subscriptions.get("sub").isReplicated);
+ consumer.close();
+
+ consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .replicateSubscriptionState(true)
+ .subscribe();
+
+ stats = admin.topics().getStats(topic);
+ assertTrue(stats.subscriptions.get("sub").isReplicated);
+ consumer.close();
+ }
+
+ @Test
+ public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception {
+ String topic = "upgradeToReplicatedSubscriptionAfterRestart-" + System.nanoTime();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .replicateSubscriptionState(false)
+ .subscribe();
+
+ TopicStats stats = admin.topics().getStats(topic);
+ assertFalse(stats.subscriptions.get("sub").isReplicated);
+ consumer.close();
+
+ admin.topics().unload(topic);
+
+ consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .replicateSubscriptionState(true)
+ .subscribe();
+
+ stats = admin.topics().getStats(topic);
+ assertTrue(stats.subscriptions.get("sub").isReplicated);
+ consumer.close();
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 8cb50f9..f8c5e0a 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -297,6 +297,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);
/**
+ *
+ * @param replicateSubscriptionState
+ */
+ ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);
+
+ /**
* Set the max total receiver queue size across partitons.
* <p>
* This setting will be used to reduce the receiver queue size for individual partitions
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 6f8d9c9..19f5c6e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -279,6 +279,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState) {
+ conf.setReplicateSubscriptionState(replicateSubscriptionState);
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors) {
if (interceptorList == null) {
interceptorList = new ArrayList<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b2421f2..8d91ca1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -526,7 +526,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
si = null;
}
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
- consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), si);
+ consumerName, isDurable, startMessageIdData, metadata, readCompacted,
+ conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+ si);
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index fd4b719..3352ebb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -100,6 +100,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
+ private boolean replicateSubscriptionState = false;
+
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 04018f6..2d76410 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -418,12 +418,14 @@ public class Commands {
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
- true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest, null);
+ true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false,
+ false /* isReplicated */, InitialPosition.Earliest, null);
}
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
- Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
+ Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
+ InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
subscribeBuilder.setTopic(topic);
subscribeBuilder.setSubscription(subscription);
@@ -435,6 +437,7 @@ public class Commands {
subscribeBuilder.setDurable(isDurable);
subscribeBuilder.setReadCompacted(readCompacted);
subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
+ subscribeBuilder.setReplicateSubscriptionState(isReplicated);
if (startMessageId != null) {
subscribeBuilder.setStartMessageId(startMessageId);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 319302d..625752b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -8471,6 +8471,10 @@ public final class PulsarApi {
// optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
boolean hasInitialPosition();
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition();
+
+ // optional bool replicate_subscription_state = 14;
+ boolean hasReplicateSubscriptionState();
+ boolean getReplicateSubscriptionState();
}
public static final class CommandSubscribe extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -8802,6 +8806,16 @@ public final class PulsarApi {
return initialPosition_;
}
+ // optional bool replicate_subscription_state = 14;
+ public static final int REPLICATE_SUBSCRIPTION_STATE_FIELD_NUMBER = 14;
+ private boolean replicateSubscriptionState_;
+ public boolean hasReplicateSubscriptionState() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ public boolean getReplicateSubscriptionState() {
+ return replicateSubscriptionState_;
+ }
+
private void initFields() {
topic_ = "";
subscription_ = "";
@@ -8816,6 +8830,7 @@ public final class PulsarApi {
readCompacted_ = false;
schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+ replicateSubscriptionState_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -8911,6 +8926,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000800) == 0x00000800)) {
output.writeEnum(13, initialPosition_.getNumber());
}
+ if (((bitField0_ & 0x00001000) == 0x00001000)) {
+ output.writeBool(14, replicateSubscriptionState_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -8971,6 +8989,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeEnumSize(13, initialPosition_.getNumber());
}
+ if (((bitField0_ & 0x00001000) == 0x00001000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBoolSize(14, replicateSubscriptionState_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -9110,6 +9132,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000800);
initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
bitField0_ = (bitField0_ & ~0x00001000);
+ replicateSubscriptionState_ = false;
+ bitField0_ = (bitField0_ & ~0x00002000);
return this;
}
@@ -9196,6 +9220,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000800;
}
result.initialPosition_ = initialPosition_;
+ if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+ to_bitField0_ |= 0x00001000;
+ }
+ result.replicateSubscriptionState_ = replicateSubscriptionState_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -9248,6 +9276,9 @@ public final class PulsarApi {
if (other.hasInitialPosition()) {
setInitialPosition(other.getInitialPosition());
}
+ if (other.hasReplicateSubscriptionState()) {
+ setReplicateSubscriptionState(other.getReplicateSubscriptionState());
+ }
return this;
}
@@ -9399,6 +9430,11 @@ public final class PulsarApi {
}
break;
}
+ case 112: {
+ bitField0_ |= 0x00002000;
+ replicateSubscriptionState_ = input.readBool();
+ break;
+ }
}
}
}
@@ -9841,6 +9877,27 @@ public final class PulsarApi {
return this;
}
+ // optional bool replicate_subscription_state = 14;
+ private boolean replicateSubscriptionState_ ;
+ public boolean hasReplicateSubscriptionState() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ public boolean getReplicateSubscriptionState() {
+ return replicateSubscriptionState_;
+ }
+ public Builder setReplicateSubscriptionState(boolean value) {
+ bitField0_ |= 0x00002000;
+ replicateSubscriptionState_ = value;
+
+ return this;
+ }
+ public Builder clearReplicateSubscriptionState() {
+ bitField0_ = (bitField0_ & ~0x00002000);
+ replicateSubscriptionState_ = false;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index f141d4a..40e3dad 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -43,7 +43,7 @@ public class SubscriptionStats {
/** Flag to verify if subscription is blocked due to reaching threshold of unacked messages */
public boolean blockedSubscriptionOnUnackedMsgs;
-
+
/** Number of unacknowledged messages for the subscription */
public long unackedMessages;
@@ -59,6 +59,9 @@ public class SubscriptionStats {
/** List of connected consumers on this subscription w/ their stats */
public List<ConsumerStats> consumers;
+ /** Mark that the subscription state is kept in sync across different regions */
+ public boolean isReplicated;
+
public SubscriptionStats() {
this.consumers = Lists.newArrayList();
}
@@ -83,6 +86,7 @@ public class SubscriptionStats {
this.msgBacklog += stats.msgBacklog;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
+ this.isReplicated |= stats.isReplicated;
if (this.consumers.size() != stats.consumers.size()) {
for (int i = 0; i < stats.consumers.size(); i++) {
ConsumerStats consumerStats = new ConsumerStats();
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 11d69be..58238d7 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -272,6 +272,11 @@ message CommandSubscribe {
// Signal wthether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];
+
+ // Mark the subscription as "replicated". Pulsar will make sure
+ // to periodically sync the state of replicated subscriptions
+ // across different clusters (when using geo-replication).
+ optional bool replicate_subscription_state = 14;
}
message CommandPartitionedTopicMetadata {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 98b3735..800fc80 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -95,6 +95,9 @@ public class PerformanceConsumer {
@Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue")
public int receiverQueueSize = 1000;
+ @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
+ public boolean replicatedSubscription = false;
+
@Parameter(names = { "--acks-delay-millis" }, description = "Acknowlegments grouping delay in millis")
public int acknowledgmentsGroupingDelayMillis = 100;
@@ -253,7 +256,8 @@ public class PerformanceConsumer {
.messageListener(listener) //
.receiverQueueSize(arguments.receiverQueueSize) //
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
- .subscriptionType(arguments.subscriptionType);
+ .subscriptionType(arguments.subscriptionType)
+ .replicateSubscriptionState(arguments.replicatedSubscription);
if (arguments.encKeyName != null) {
byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));