You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 06:19:55 UTC

[pulsar] branch branch-2.5 updated: [Issue 6394] Add configuration to disable auto creation of subscriptions (#6456)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 41d1d6e  [Issue 6394] Add configuration to disable auto creation of subscriptions (#6456)
41d1d6e is described below

commit 41d1d6ecb9b03c1cecadd9222b82d3f197e4c9b6
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Mar 4 17:21:53 2020 -0800

    [Issue 6394] Add configuration to disable auto creation of subscriptions (#6456)
    
    ### Motivation
    
    Fixes #6394
    
    ### Modifications
    
    - provide a flag `allowAutoSubscriptionCreation` in `ServiceConfiguration`, defaults to `true`
    - when `allowAutoSubscriptionCreation` is disabled and the specified subscription (`Durable`) on the topic does not exist when trying to subscribe via a consumer, the server should reject the request directly by `handleSubscribe` in `ServerCnx`
    - create the subscription on the coordination topic if it does not exist when init `WorkerService`
    
    (cherry picked from commit c3292a611c9cbf2b17c96c5317d8f20247eb1f41)
---
 conf/broker.conf                                   |  3 ++
 conf/standalone.conf                               |  3 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 +++
 .../broker/service/BrokerServiceException.java     |  6 ++++
 .../apache/pulsar/broker/service/ServerCnx.java    | 13 +++++--
 .../BrokerServiceAutoTopicCreationTest.java        | 42 ++++++++++++++++++++++
 .../pulsar/functions/worker/WorkerService.java     |  5 +++
 site2/docs/reference-configuration.md              |  1 +
 8 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 62107cb..fc40a15 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -91,6 +91,9 @@ allowAutoTopicCreation=true
 # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
 allowAutoTopicCreationType=non-partitioned
 
+# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
+allowAutoSubscriptionCreation=true
+
 # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
 defaultNumPartitions=1
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 6619e90..3fa855c 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -728,6 +728,9 @@ allowAutoTopicCreation=true
 # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
 allowAutoTopicCreationType=non-partitioned
 
+# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
+allowAutoSubscriptionCreation=true
+
 # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
 defaultNumPartitions=1
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 68d56fe..c13089b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1008,6 +1008,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private String allowAutoTopicCreationType = "non-partitioned";
     @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Allow automated creation of subscriptions if set to true (default value)."
+    )
+    private boolean allowAutoSubscriptionCreation = true;
+    @FieldContext(
             category = CATEGORY_STORAGE_ML,
             doc = "The number of partitioned topics that is allowed to be automatically created"
                     + "if allowAutoTopicCreationType is partitioned."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index e794718..b4bfed5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -123,6 +123,12 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class SubscriptionNotFoundException extends BrokerServiceException {
+        public SubscriptionNotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class SubscriptionBusyException extends BrokerServiceException {
         public SubscriptionBusyException(String msg) {
             super(msg);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c1a90be..39da53d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -740,7 +741,6 @@ public class ServerCnx extends PulsarHandler {
                 subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
                 subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
                 : null;
-        final String subscription = subscribe.getSubscription();
         final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
@@ -766,7 +766,7 @@ public class ServerCnx extends PulsarHandler {
                 if (service.isAuthorizationEnabled()) {
                     authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
                             originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
-                            subscription);
+                            subscriptionName);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
@@ -829,6 +829,15 @@ public class ServerCnx extends PulsarHandler {
 
                                     Topic topic = optTopic.get();
 
+                                    boolean rejectSubscriptionIfDoesNotExist = isDurable
+                                        && !service.pulsar().getConfig().isAllowAutoSubscriptionCreation()
+                                        && !topic.getSubscriptions().containsKey(subscriptionName);
+
+                                    if (rejectSubscriptionIfDoesNotExist) {
+                                        return FutureUtil
+                                            .failedFuture(new SubscriptionNotFoundException("Subscription does not exist"));
+                                    }
+
                                     if (schema != null) {
                                         return topic.addSchemaIfIdleOrCheckCompatible(schema)
                                             .thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 275ec3f..3e906ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -141,6 +141,48 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
         assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
     }
 
+    @Test
+    public void testAutoSubscriptionCreationDisable() throws Exception{
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        final String topicName = "persistent://prop/ns-abc/test-subtopic";
+        final String subscriptionName = "test-subtopic-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        try {
+            pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+            fail("Subscribe operation should have failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException);
+        }
+        assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+        // Reset to default
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+    }
+
+    @Test
+    public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        final String topicName = "persistent://prop/ns-abc/test-subtopic";
+        final String subscriptionName = "test-subtopic-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+        assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+        // Create the subscription by PulsarAdmin
+        admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
+        assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+
+        // Subscribe operation should be successful
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+        // Reset to default
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+    }
+
     /**
      * CheckAllowAutoCreation's default value is false.
      * So using getPartitionedTopicMetadata() directly will not produce partitioned topic
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index fdfb7de..b23c707 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -34,6 +34,7 @@ import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 
@@ -162,6 +163,10 @@ public class WorkerService {
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
             //create membership manager
+            String coordinationTopic = workerConfig.getClusterCoordinationTopic();
+            if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
+                brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
+            }
             this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin);
 
             // create function runtime manager
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 642a7c0..3d0f242 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -129,6 +129,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |backlogQuotaDefaultLimitGB|  Default per-topic backlog quota limit |10|
 |allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
 |allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned|
+|allowAutoSubscriptionCreation| Enable subscription auto creation if a new consumer connected |true|
 |defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1|
 |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics  |true|
 |brokerDeleteInactiveTopicsFrequencySeconds|  How often to check for inactive topics  |60|