You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 16:30:24 UTC

[pulsar] 13/19: [pulsar-broker] Handle lookup redirect for V1-topics with different cluster (#12743)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a813c52931a8078a959b06cdba750419bc71dad5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Nov 11 07:32:35 2021 -0800

    [pulsar-broker] Handle lookup redirect for V1-topics with different cluster (#12743)
    
    (cherry picked from commit 25cbfadbf0a1aba8c149c978063902500ab08379)
---
 .../authorization/PulsarAuthorizationProvider.java | 23 ++++++++++++----------
 .../pulsar/broker/resources/ClusterResources.java  |  4 ++++
 .../pulsar/broker/service/ReplicatorTest.java      | 21 ++++++++++++++++++++
 3 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b6b1bafd..b50d7de 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -364,20 +364,23 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
     }
 
     private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
-        return checkPermission(topicName, role, action)
-                .thenApply(isPermission -> isPermission && checkCluster(topicName));
+        return checkPermission(topicName, role, action).
+                thenApply(isPermission -> isPermission).
+                thenCompose(permission -> permission ? checkCluster(topicName) :
+                    CompletableFuture.completedFuture(false));
     }
 
-    private boolean checkCluster(TopicName topicName) {
+    private CompletableFuture<Boolean> checkCluster(TopicName topicName) {
         if (topicName.isGlobal() || conf.getClusterName().equals(topicName.getCluster())) {
-            return true;
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(),
-                        conf.getClusterName());
-            }
-            return false;
+            return CompletableFuture.completedFuture(true);
         }
+        if (log.isDebugEnabled()) {
+            log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), conf.getClusterName());
+        }
+        return pulsarResources.getClusterResources().listAsync()
+                .thenApply(clusters -> {
+                    return clusters.contains(topicName.getCluster());
+                });
     }
 
     public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 1a3cf89..a4ee27a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -42,6 +42,10 @@ public class ClusterResources extends BaseResources<ClusterData> {
         this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec);
     }
 
+    public CompletableFuture<Set<String>> listAsync() {
+        return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new HashSet<>(list));
+    }
+
     public Set<String> list() throws MetadataStoreException {
         return new HashSet<>(super.getChildren(BASE_CLUSTERS_PATH));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 04f96ec..87e2730 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1310,6 +1310,27 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
     }
 
+    @Test
+    public void testLookupAnotherCluster() throws Exception {
+        log.info("--- Starting ReplicatorTest::testLookupAnotherCluster ---");
+
+        String namespace = "pulsar/r2/cross-cluster-ns";
+        admin1.namespaces().createNamespace(namespace);
+        final TopicName topicName = TopicName
+                .get("persistent://" + namespace + "/topic");
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder()
+                .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        Producer<byte[]> producer = client1.newProducer().topic(topicName.toString())
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+        
+        producer.close();
+    }
+
     private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
         // wait non-partitioned topics replicators created finished
         final List<String> list = new ArrayList<>();