You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 06:19:55 UTC
[pulsar] branch branch-2.5 updated: [Issue 6394] Add configuration
to disable auto creation of subscriptions (#6456)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 41d1d6e [Issue 6394] Add configuration to disable auto creation of subscriptions (#6456)
41d1d6e is described below
commit 41d1d6ecb9b03c1cecadd9222b82d3f197e4c9b6
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Mar 4 17:21:53 2020 -0800
[Issue 6394] Add configuration to disable auto creation of subscriptions (#6456)
### Motivation
Fixes #6394
### Modifications
- provide a flag `allowAutoSubscriptionCreation` in `ServiceConfiguration`, defaults to `true`
- when `allowAutoSubscriptionCreation` is disabled and the specified subscription (`Durable`) on the topic does not exist when trying to subscribe via a consumer, the server should reject the request directly by `handleSubscribe` in `ServerCnx`
- create the subscription on the coordination topic if it does not exist when init `WorkerService`
(cherry picked from commit c3292a611c9cbf2b17c96c5317d8f20247eb1f41)
---
conf/broker.conf | 3 ++
conf/standalone.conf | 3 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 5 +++
.../broker/service/BrokerServiceException.java | 6 ++++
.../apache/pulsar/broker/service/ServerCnx.java | 13 +++++--
.../BrokerServiceAutoTopicCreationTest.java | 42 ++++++++++++++++++++++
.../pulsar/functions/worker/WorkerService.java | 5 +++
site2/docs/reference-configuration.md | 1 +
8 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 62107cb..fc40a15 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -91,6 +91,9 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned
+# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
+allowAutoSubscriptionCreation=true
+
# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 6619e90..3fa855c 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -728,6 +728,9 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned
+# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
+allowAutoSubscriptionCreation=true
+
# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 68d56fe..c13089b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1008,6 +1008,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String allowAutoTopicCreationType = "non-partitioned";
@FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Allow automated creation of subscriptions if set to true (default value)."
+ )
+ private boolean allowAutoSubscriptionCreation = true;
+ @FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The number of partitioned topics that is allowed to be automatically created"
+ "if allowAutoTopicCreationType is partitioned."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index e794718..b4bfed5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -123,6 +123,12 @@ public class BrokerServiceException extends Exception {
}
}
+ public static class SubscriptionNotFoundException extends BrokerServiceException {
+ public SubscriptionNotFoundException(String msg) {
+ super(msg);
+ }
+ }
+
public static class SubscriptionBusyException extends BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c1a90be..39da53d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -740,7 +741,6 @@ public class ServerCnx extends PulsarHandler {
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null;
- final String subscription = subscribe.getSubscription();
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
@@ -766,7 +766,7 @@ public class ServerCnx extends PulsarHandler {
if (service.isAuthorizationEnabled()) {
authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
- subscription);
+ subscriptionName);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
@@ -829,6 +829,15 @@ public class ServerCnx extends PulsarHandler {
Topic topic = optTopic.get();
+ boolean rejectSubscriptionIfDoesNotExist = isDurable
+ && !service.pulsar().getConfig().isAllowAutoSubscriptionCreation()
+ && !topic.getSubscriptions().containsKey(subscriptionName);
+
+ if (rejectSubscriptionIfDoesNotExist) {
+ return FutureUtil
+ .failedFuture(new SubscriptionNotFoundException("Subscription does not exist"));
+ }
+
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 275ec3f..3e906ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -141,6 +141,48 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
+ @Test
+ public void testAutoSubscriptionCreationDisable() throws Exception{
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+ final String topicName = "persistent://prop/ns-abc/test-subtopic";
+ final String subscriptionName = "test-subtopic-sub";
+
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ try {
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ fail("Subscribe operation should have failed");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarClientException);
+ }
+ assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+ // Reset to default
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+ }
+
+ @Test
+ public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+ final String topicName = "persistent://prop/ns-abc/test-subtopic";
+ final String subscriptionName = "test-subtopic-sub";
+
+ admin.topics().createNonPartitionedTopic(topicName);
+ assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+ // Create the subscription by PulsarAdmin
+ admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
+ assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+ // Subscribe operation should be successful
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+ // Reset to default
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+ }
+
/**
* CheckAllowAutoCreation's default value is false.
* So using getPartitionedTopicMetadata() directly will not produce partitioned topic
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index fdfb7de..b23c707 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -34,6 +34,7 @@ import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -162,6 +163,10 @@ public class WorkerService {
this.connectorsManager = new ConnectorsManager(workerConfig);
//create membership manager
+ String coordinationTopic = workerConfig.getClusterCoordinationTopic();
+ if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
+ brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
+ }
this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin);
// create function runtime manager
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 642a7c0..3d0f242 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -129,6 +129,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|backlogQuotaDefaultLimitGB| Default per-topic backlog quota limit |10|
|allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
|allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned|
+|allowAutoSubscriptionCreation| Enable subscription auto creation if a new consumer connected |true|
|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|