You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2021/12/26 13:10:46 UTC

[pulsar] branch master updated: Subscription types support across multiple clusters (#13482)

This is an automated email from the ASF dual-hosted git repository.

linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cadc011  Subscription types support across multiple clusters (#13482)
cadc011 is described below

commit cadc011d3090344c25ca259eb52ad9cc49f1c6ba
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sun Dec 26 21:08:47 2021 +0800

    Subscription types support across multiple clusters (#13482)
    
    * Subscription Types support cross multiple clusters
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 14 +++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  9 ++-
 .../service/ReplicatorTopicPoliciesTest.java       | 25 ++++++++
 .../apache/pulsar/client/admin/TopicPolicies.java  | 14 +++++
 .../client/admin/internal/TopicPoliciesImpl.java   | 25 ++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 16 +++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 72 +++++++++++++++++++++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  5 +-
 8 files changed, 168 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8520d11..9e505f0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4255,30 +4255,32 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
-    protected CompletableFuture<Optional<List<SubType>>> internalGetSubscriptionTypesEnabled() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Optional<List<SubType>>> internalGetSubscriptionTypesEnabled(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getSubscriptionTypesEnabled));
     }
 
     protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(
-            Set<SubscriptionType> subscriptionTypesEnabled) {
+            Set<SubscriptionType> subscriptionTypesEnabled, boolean isGlobal) {
         List<SubType> subTypes = Lists.newArrayList();
         subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name())));
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setSubscriptionTypesEnabled(subTypes);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
 
-    protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
                 .thenCompose(op -> {
                     if (!op.isPresent()) {
                         return CompletableFuture.completedFuture(null);
                     }
                     op.get().setSubscriptionTypesEnabled(Lists.newArrayList());
+                    op.get().setIsGlobal(isGlobal);
                     return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
                 });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 50eb291..b463e72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3242,10 +3242,11 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetSubscriptionTypesEnabled())
+            .thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
             .thenAccept(op -> {
                 asyncResponse.resume(op.isPresent() ? op.get()
                         : Response.noContent().build());
@@ -3269,12 +3270,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Enable sub types for the specified topic")
             Set<SubscriptionType> subscriptionTypesEnabled) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled))
+            .thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
             .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic is enabled sub types :"
@@ -3306,10 +3308,11 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-                .thenCompose(__ -> internalRemoveSubscriptionTypesEnabled())
+                .thenCompose(__ -> internalRemoveSubscriptionTypesEnabled(isGlobal))
                 .thenRun(() -> {
                     log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}",
                             clientAppId(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 3ac9917..8c9e030 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -23,10 +23,14 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -173,6 +177,27 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         Awaitility.await().untilAsserted(() ->
                 assertNull(admin3.topicPolicies(true).getRetention(persistentTopicName)));
     }
+    @Test
+    public void testReplicateSubscriptionTypesPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, topic);
+        Set<SubscriptionType> subscriptionTypes = new HashSet<>();
+        subscriptionTypes.add(SubscriptionType.Shared);
+        // set subscription types policies
+        admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, subscriptionTypes);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
+        // remove subscription types policies
+        admin1.topicPolicies(true).removeSubscriptionTypesEnabled(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));
+
+    }
 
     private void init(String namespace, String topic)
             throws PulsarAdminException, PulsarClientException, PulsarServerException {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index dc2b43d..8dcfc31 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -1578,6 +1578,20 @@ public interface TopicPolicies {
     CompletableFuture<Set<SubscriptionType>> getSubscriptionTypesEnabledAsync(String topic);
 
     /**
+     * Remove subscription types enabled for a topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void removeSubscriptionTypesEnabled(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove subscription types enabled for a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String topic);
+
+    /**
      * Set topic-subscribe-rate (topic will limit by subscribeRate).
      *
      * @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index dcbbdfc..0629f0d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.client.admin.internal;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -1232,6 +1235,28 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     }
 
     @Override
+    public void removeSubscriptionTypesEnabled(String topic) throws PulsarAdminException {
+        try {
+            removeSubscriptionTypesEnabledAsync(topic)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
         return getSubscribeRate(topic, false);
     }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5394475..e0bfcc9 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -892,6 +892,14 @@ public class PulsarAdminToolTest {
 
         CmdTopicPolicies cmdTopics = new CmdTopicPolicies(() -> admin);
 
+        cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover"));
+        verify(mockTopicsPolicies).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
+                Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
+        cmdTopics.run(split("get-subscription-types-enabled persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-subscription-types-enabled persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getRetention("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M"));
@@ -981,6 +989,14 @@ public class PulsarAdminToolTest {
         verify(mockGlobalTopicsPolicies).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover -g"));
+        verify(mockGlobalTopicsPolicies).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
+                Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
+        cmdTopics.run(split("get-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index ddbe091..6655a8f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -22,13 +22,16 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.TopicPolicies;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -42,6 +45,9 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
         jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
+        jcommander.addCommand("set-subscription-types-enabled", new SetSubscriptionTypesEnabled());
+        jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled());
+        jcommander.addCommand("remove-subscription-types-enabled", new RemoveSubscriptionTypesEnabled());
         jcommander.addCommand("get-retention", new GetRetention());
         jcommander.addCommand("set-retention", new SetRetention());
         jcommander.addCommand("remove-retention", new RemoveRetention());
@@ -117,6 +123,69 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set subscription types enabled for a topic")
+    private class SetSubscriptionTypesEnabled extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--types", "-t"}, description = "Subscription types enabled list (comma separated values)."
+                + " Possible values: (Exclusive, Shared, Failover, Key_Shared).", required = true)
+        private List<String> subTypes;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            Set<SubscriptionType> types = new HashSet<>();
+            subTypes.forEach(s -> {
+                SubscriptionType subType;
+                try {
+                    subType = SubscriptionType.valueOf(s);
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(String.format("Illegal subscription type %s. Possible values: %s.", s,
+                            Arrays.toString(SubscriptionType.values())));
+                }
+                types.add(subType);
+            });
+            getTopicPolicies(isGlobal).setSubscriptionTypesEnabled(persistentTopic, types);
+        }
+    }
+
+    @Parameters(commandDescription = "Get subscription types enabled for a topic")
+    private class GetSubscriptionTypesEnabled extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getSubscriptionTypesEnabled(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove subscription types enabled for a topic")
+    private class RemoveSubscriptionTypesEnabled extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeSubscriptionTypesEnabled(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the retention policy for a topic")
     private class GetRetention extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
@@ -184,8 +253,7 @@ public class CmdTopicPolicies extends CmdBase {
         private List<String> params;
 
         @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
-                + "If set to true, the removing operation will be replicate to other clusters asynchronously"
-                , arity = 0)
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
         private boolean isGlobal = false;
 
         @Override
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index c97aaac..f3468ce 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -285,6 +285,10 @@ public class CmdTopics extends CmdBase {
             cmdUsageFormatter.addDeprecatedCommand("get-deduplication");
             cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
             cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");
+
+            cmdUsageFormatter.addDeprecatedCommand("set-subscription-types-enabled");
+            cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled");
+            cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");
         }
     }
 
@@ -2004,7 +2008,6 @@ public class CmdTopics extends CmdBase {
         }
     }
 
-
     @Parameters(commandDescription = "Set subscription types enabled for a topic")
     private class SetSubscriptionTypesEnabled extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)