You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 16:30:24 UTC
[pulsar] 13/19: [pulsar-broker] Handle lookup redirect for V1-topics with different cluster (#12743)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a813c52931a8078a959b06cdba750419bc71dad5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Nov 11 07:32:35 2021 -0800
[pulsar-broker] Handle lookup redirect for V1-topics with different cluster (#12743)
(cherry picked from commit 25cbfadbf0a1aba8c149c978063902500ab08379)
---
.../authorization/PulsarAuthorizationProvider.java | 23 ++++++++++++----------
.../pulsar/broker/resources/ClusterResources.java | 4 ++++
.../pulsar/broker/service/ReplicatorTest.java | 21 ++++++++++++++++++++
3 files changed, 38 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b6b1bafd..b50d7de 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -364,20 +364,23 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
}
private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
- return checkPermission(topicName, role, action)
- .thenApply(isPermission -> isPermission && checkCluster(topicName));
+ return checkPermission(topicName, role, action).
+ thenApply(isPermission -> isPermission).
+ thenCompose(permission -> permission ? checkCluster(topicName) :
+ CompletableFuture.completedFuture(false));
}
- private boolean checkCluster(TopicName topicName) {
+ private CompletableFuture<Boolean> checkCluster(TopicName topicName) {
if (topicName.isGlobal() || conf.getClusterName().equals(topicName.getCluster())) {
- return true;
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(),
- conf.getClusterName());
- }
- return false;
+ return CompletableFuture.completedFuture(true);
}
+ if (log.isDebugEnabled()) {
+ log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), conf.getClusterName());
+ }
+ return pulsarResources.getClusterResources().listAsync()
+ .thenApply(clusters -> {
+ return clusters.contains(topicName.getCluster());
+ });
}
public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 1a3cf89..a4ee27a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -42,6 +42,10 @@ public class ClusterResources extends BaseResources<ClusterData> {
this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec);
}
+ public CompletableFuture<Set<String>> listAsync() {
+ return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new HashSet<>(list));
+ }
+
public Set<String> list() throws MetadataStoreException {
return new HashSet<>(super.getChildren(BASE_CLUSTERS_PATH));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 04f96ec..87e2730 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1310,6 +1310,27 @@ public class ReplicatorTest extends ReplicatorTestBase {
});
}
+ @Test
+ public void testLookupAnotherCluster() throws Exception {
+ log.info("--- Starting ReplicatorTest::testLookupAnotherCluster ---");
+
+ String namespace = "pulsar/r2/cross-cluster-ns";
+ admin1.namespaces().createNamespace(namespace);
+ final TopicName topicName = TopicName
+ .get("persistent://" + namespace + "/topic");
+
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder()
+ .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> producer = client1.newProducer().topic(topicName.toString())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ producer.close();
+ }
+
private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
// wait non-partitioned topics replicators created finished
final List<String> list = new ArrayList<>();