You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/22 15:39:36 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request #12136: Support for setting geo-replication clusters on topic level

Technoboy- opened a new pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136


   ## Motivation
   Currently, when we try to enable the geo-replication, we need to set multiple clusters for the namespaces, and all the topics under the namespace will start to replicate data across clusters.
   
   It's useful for adding the replicate clusters in the topic level so that we can achieve the topics under a namespace can have different replication clusters such as topic A(cluster-a <-> cluster-b) topic B(cluster-b <-> cluster-c).
   
   Since we have a proposal for add global topic policies https://github.com/apache/pulsar/wiki/PIP-92%3A-Topic-policy-across-multiple-clusters, we should also consider the global topic policies when implementing the geo-replication clusters setting for TopicPolicies
   
   
   ## Documentation
   I'm not sure if the doc could generate automatically. If no, I will update the doc later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r742869242



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,36 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<Set<String>> replicationClustersFuture = brokerService.pulsar()

Review comment:
       Looks currently rely on the check compaction monitor, both onPoliciesUpdate and onUpdate does not check the compaction, but it's better start a check after the policy changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- removed a comment on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- removed a comment on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-925530791


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716461097



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,37 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<List<String>> replicationClustersFuture = brokerService.pulsar()
+                .getTopicPoliciesService()
+                .getTopicPoliciesAsyncWithRetry(name, null, brokerService.pulsar().getExecutor())
+                .thenCompose(topicPolicies -> {
+                    if (!topicPolicies.isPresent()
+                            || CollectionUtils.isEmpty(topicPolicies.get().getReplicationClusters())) {

Review comment:
       Ok, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716132723



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,37 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<List<String>> replicationClustersFuture = brokerService.pulsar()
+                .getTopicPoliciesService()
+                .getTopicPoliciesAsyncWithRetry(name, null, brokerService.pulsar().getExecutor())
+                .thenCompose(topicPolicies -> {
+                    if (!topicPolicies.isPresent()
+                            || CollectionUtils.isEmpty(topicPolicies.get().getReplicationClusters())) {

Review comment:
       We need to define the state:
   null means no setting
   The list is Empty, which means that the value is not set, that is, the copy of this Topic under Namespace is disabled
   
   If empty also means that there is no setting, subsequent Topic-level Policies cannot be disabled
   

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -226,6 +226,10 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
 
         jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus());
 
+        jcommander.addCommand("get-replication-clusters", new GetReplicationClusters());

Review comment:
       Please also add unit test in `PulsarAdminToolTest`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2708,6 +2709,60 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
             });
     }
 
+    protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        Set<String> replicationClusters = Sets.newHashSet(clusterIds);
+        if (replicationClusters.contains("global")) {
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Cannot specify global in the list of replication clusters");
+        }
+        Set<String> clusters = clusters();
+        for (String clusterId : replicationClusters) {
+            if (!clusters.contains(clusterId)) {
+                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
+            }
+            validatePeerClusterConflict(clusterId, replicationClusters);
+            validateClusterForTenant(namespaceName.getTenant(), clusterId);
+        }
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+                            .thenRun(() -> {
+                                log.info("[{}] Successfully set replication clusters for namespace={}, "
+                                                + "topic={}, clusters={}",
+                                        clientAppId(),
+                                        namespaceName,
+                                        topicName.getLocalName(),
+                                        topicPolicies.getReplicationClusters());
+                            });
+                }
+        );
+    }
+
+    protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setReplicationClusters(Collections.emptyList());

Review comment:
       emptyList is a static variable. I think we can use list.clear or `new ArrayList()`

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -2641,4 +2642,23 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
         });
     }
 
+    @Test(timeOut = 30000)

Review comment:
       Please add the following unit tests:
   1)The namespace and topic level policies are set at the same time, whether the priority is correct
   
   2)Disabling topic-level policies




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-926320635


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716132723



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,37 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<List<String>> replicationClustersFuture = brokerService.pulsar()
+                .getTopicPoliciesService()
+                .getTopicPoliciesAsyncWithRetry(name, null, brokerService.pulsar().getExecutor())
+                .thenCompose(topicPolicies -> {
+                    if (!topicPolicies.isPresent()
+                            || CollectionUtils.isEmpty(topicPolicies.get().getReplicationClusters())) {

Review comment:
       We need to define the state:
   null means no setting
   The list is Empty, which means that the value is not set, that is, the policies of this Topic under Namespace is disabled
   
   If empty also means that there is no setting, subsequent Topic-level Policies cannot be disabled
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-925631580


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-953432739


   @Technoboy- shall we need docs [here](https://pulsar.apache.org/docs/en/next/administration-geo/#enable-geo-replication-namespaces)? Like adding a section "Set geo-replication clusters on a topic level", and adding explanations, commands, and examples.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-953931928


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-925530791


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r715284214



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,36 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<Set<String>> replicationClustersFuture = brokerService.pulsar()

Review comment:
       From the discussion, my implement and your suggestion both can realize this.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-925425806


   > I'm not sure if the doc could generate automatically. If no, I will update the doc later.
   
   @Technoboy- the doc you added can be generated automatically. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716460872



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2708,6 +2709,60 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
             });
     }
 
+    protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        Set<String> replicationClusters = Sets.newHashSet(clusterIds);
+        if (replicationClusters.contains("global")) {
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Cannot specify global in the list of replication clusters");
+        }
+        Set<String> clusters = clusters();
+        for (String clusterId : replicationClusters) {
+            if (!clusters.contains(clusterId)) {
+                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
+            }
+            validatePeerClusterConflict(clusterId, replicationClusters);
+            validateClusterForTenant(namespaceName.getTenant(), clusterId);
+        }
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+                            .thenRun(() -> {
+                                log.info("[{}] Successfully set replication clusters for namespace={}, "
+                                                + "topic={}, clusters={}",
+                                        clientAppId(),
+                                        namespaceName,
+                                        topicName.getLocalName(),
+                                        topicPolicies.getReplicationClusters());
+                            });
+                }
+        );
+    }
+
+    protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setReplicationClusters(Collections.emptyList());

Review comment:
       ok

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -2641,4 +2642,23 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
         });
     }
 
+    @Test(timeOut = 30000)

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r714837509



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
##########
@@ -495,23 +495,28 @@ public void removeProducer(Producer producer) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        return brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                    if (!optPolicies.isPresent()) {
-                        return FutureUtil.failedFuture(
-                                new ServerMetadataException(
-                                        new MetadataStoreException.NotFoundException()));
-                    }
+        return brokerService.pulsar()

Review comment:
       Thanks for the kindly review.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
##########
@@ -495,23 +495,28 @@ public void removeProducer(Producer producer) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        return brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                    if (!optPolicies.isPresent()) {
-                        return FutureUtil.failedFuture(
-                                new ServerMetadataException(
-                                        new MetadataStoreException.NotFoundException()));
-                    }
+        return brokerService.pulsar()

Review comment:
       Thanks for the kind review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-926644470


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r742921906



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,36 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<Set<String>> replicationClustersFuture = brokerService.pulsar()

Review comment:
       Current implementation relies on `replicationPolicyCheckDurationSeconds = 600` to check replication policy update. We can merge this PR first, and consider use another PR to update the replication policy by listener.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-953440339


   > @Technoboy- do we need to add docs [here](https://pulsar.apache.org/docs/en/next/administration-geo/#enable-geo-replication-namespaces)? Like adding a section "Set geo-replication clusters on a topic level", and adding explanations, commands, and examples.
   
   Seem that the doc could be generated automatically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-953893549


   > @Technoboy-
   > 
   > 1. Do we need to add docs [here](https://pulsar.apache.org/docs/en/next/administration-geo/#enable-geo-replication-namespaces)? Like adding a section "Set geo-replication clusters on a topic level", and adding explanations, commands, and examples.
   > 2. Do we need to update docs as below?
   >    https://pulsar.apache.org/docs/en/next/administration-geo/#geo-replication-and-pulsar-properties
   >    ![image](https://user-images.githubusercontent.com/50226895/139172797-754abcb7-bb34-45d3-b733-3069b48808a0.png)
   
   Yes, right. I will update this part later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-926474702


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 merged pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
hangc0276 merged pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-953432739


   @Technoboy- 
   
   1. Do we need to add docs [here](https://pulsar.apache.org/docs/en/next/administration-geo/#enable-geo-replication-namespaces)? Like adding a section "Set geo-replication clusters on a topic level", and adding explanations, commands, and examples.
   
   2. Do we need to update docs as below?
   https://pulsar.apache.org/docs/en/next/administration-geo/#geo-replication-and-pulsar-properties
   ![image](https://user-images.githubusercontent.com/50226895/139172797-754abcb7-bb34-45d3-b733-3069b48808a0.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r714724548



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,36 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<Set<String>> replicationClustersFuture = brokerService.pulsar()

Review comment:
       The checkReplication is call by `onPoliciesUpdate`, which is listen namespace level policy update. For topic level policy update listener, we should deal with the topic policy in `onUpdate()` method.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -2641,4 +2642,23 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
         });
     }
 
+    @Test(timeOut = 30000)

Review comment:
       We'd better setup multi individual pulsar clusters, and turn on topic level replication policy for specific topic to test topic replicate policy. 
   For example, we setup cluster-A and cluster-B, and configure `persistent://public/default/topic-test` to replicate from cluster-A to cluster-B,  and then write message-1 to topic `persistent://public/default/topic-test` in cluster-A, we can check whether message-1 can be read from topic `persistent://public/default/topic-test` in cluster-B. Furthermore, we and update, remove topic replication policy, check the messages in two clusters.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
##########
@@ -495,23 +495,28 @@ public void removeProducer(Producer producer) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        return brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                    if (!optPolicies.isPresent()) {
-                        return FutureUtil.failedFuture(
-                                new ServerMetadataException(
-                                        new MetadataStoreException.NotFoundException()));
-                    }
+        return brokerService.pulsar()

Review comment:
       In current implementation, non-persistent topic doesn't configure topic policy, so it doesn't need to check replication policy in `checkReplication`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r715283758



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -2641,4 +2642,23 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
         });
     }
 
+    @Test(timeOut = 30000)

Review comment:
       Yes, I will create a test to cover this later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716460798



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -226,6 +226,10 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
 
         jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus());
 
+        jcommander.addCommand("get-replication-clusters", new GetReplicationClusters());

Review comment:
       Ok, done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-953432739


   @Technoboy- do we need to add docs [here](https://pulsar.apache.org/docs/en/next/administration-geo/#enable-geo-replication-namespaces)? Like adding a section "Set geo-replication clusters on a topic level", and adding explanations, commands, and examples.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r742869242



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1334,33 +1335,36 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
-                .getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                            if (!optPolicies.isPresent()) {
-                                return FutureUtil.failedFuture(
-                                        new ServerMetadataException(
-                                                new MetadataStoreException.NotFoundException()));
-                            }
+        CompletableFuture<Set<String>> replicationClustersFuture = brokerService.pulsar()

Review comment:
       Looks currently rely on the check replication monitor, both onPoliciesUpdate and onUpdate does not check the replication, but it's better start a check after the policy changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- removed a comment on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- removed a comment on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-926320635






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-931144100


   @hangc0276  Please review again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-926470296


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-926471627


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #12136: Support for setting geo-replication clusters on topic level

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#issuecomment-956030239


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org