You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2021/12/26 09:14:33 UTC

[pulsar] branch master updated: Deduplication status support cross multiple clusters (#13487)

This is an automated email from the ASF dual-hosted git repository.

linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53bc0d5  Deduplication status support cross multiple clusters (#13487)
53bc0d5 is described below

commit 53bc0d5b2a4f5221b053d7ea49595468b445a2d3
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sun Dec 26 17:13:33 2021 +0800

    Deduplication status support cross multiple clusters (#13487)
    
    * Persistence policies support cross multiple clusters
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  9 ++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  9 ++--
 .../service/ReplicatorTopicPoliciesTest.java       | 20 +++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 14 +++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 61 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  4 ++
 6 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0f56e2b..8520d11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2893,8 +2893,8 @@ public class PersistentTopicsBase extends AdminResource {
         );
     }
 
-    protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
                 .orElseGet(() -> {
                     if (applied) {
@@ -2905,11 +2905,12 @@ public class PersistentTopicsBase extends AdminResource {
                 }));
     }
 
-    protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setDeduplicationEnabled(enabled);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 253b388..50eb291 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1872,11 +1872,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetDeduplication(applied))
+            .thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getDeduplication", ex, asyncResponse);
@@ -1897,12 +1898,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "DeduplicationEnabled policies for the specified topic")
                     Boolean enabled) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetDeduplication(enabled))
+            .thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("setDeduplication", ex, asyncResponse);
@@ -1923,10 +1925,11 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetDeduplication(null))
+            .thenCompose(__ -> internalSetDeduplication(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("removeDeduplication", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 959f6b9..3ac9917 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -127,6 +127,26 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
     }
 
     @Test
+    public void testReplicateDeduplicationStatusPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, topic);
+        // set subscription types policies
+        admin1.topicPolicies(true).setDeduplicationStatus(topic, true);
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertTrue(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
+        // remove subscription types policies
+        admin1.topicPolicies(true).removeDeduplicationStatus(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
+
+    }
+
+    @Test
     public void testReplicatorTopicPolicies() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d19047f..5394475 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -915,6 +915,13 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
+        verify(mockTopicsPolicies).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+
         // Reset the cmd, and check global option
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 -g"));
@@ -967,6 +974,13 @@ public class PulsarAdminToolTest {
                 new PersistencePolicies(2, 1, 1, 100.0d));
         cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
+        verify(mockGlobalTopicsPolicies).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 1850f4b..ddbe091 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -49,6 +49,10 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("set-backlog-quota", new SetBacklogQuota());
         jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota());
 
+        jcommander.addCommand("set-deduplication", new SetDeduplicationStatus());
+        jcommander.addCommand("get-deduplication", new GetDeduplicationStatus());
+        jcommander.addCommand("remove-deduplication", new RemoveDeduplicationStatus());
+
         jcommander.addCommand("get-persistence", new GetPersistence());
         jcommander.addCommand("set-persistence", new SetPersistence());
         jcommander.addCommand("remove-persistence", new RemovePersistence());
@@ -191,6 +195,63 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Enable or disable status for a topic")
+    private class SetDeduplicationStatus extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable deduplication")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable deduplication")
+        private boolean disable = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+
+            if (enable == disable) {
+                throw new ParameterException("Need to specify either --enable or --disable");
+            }
+            getTopicPolicies(isGlobal).setDeduplicationStatus(persistentTopic, enable);
+        }
+    }
+
+    @Parameters(commandDescription = "Get the deduplication status for a topic")
+    private class GetDeduplicationStatus extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. ")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getDeduplicationStatus(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the deduplication status for a topic")
+    private class RemoveDeduplicationStatus extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeDeduplicationStatus(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the backlog quota policies for a topic")
     private class GetBacklogQuotaMap extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 6930b0e..c97aaac 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -281,6 +281,10 @@ public class CmdTopics extends CmdBase {
             cmdUsageFormatter.addDeprecatedCommand("get-persistence");
             cmdUsageFormatter.addDeprecatedCommand("set-persistence");
             cmdUsageFormatter.addDeprecatedCommand("remove-persistence");
+
+            cmdUsageFormatter.addDeprecatedCommand("get-deduplication");
+            cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
+            cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");
         }
     }