You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/14 03:19:09 UTC

[pulsar] 03/03: Fix admin-api-brokers list failed (#9191)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 823f1fc81c7884781e9adf8a7ccb1601fb3531dc
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Jan 13 18:27:50 2021 +0800

    Fix admin-api-brokers list failed (#9191)
    
    Fixes #9128
    
    ### Motivation
    There have bug in parses cluster service url. Current address resolution does not support multiple addresses.
    Such as `http://host1:8080,host2:8080,host3:8080`
    
    ### Modifications
    Let URI resolution support multiple addresses
    
    ### Verifying this change
    unit test:
    activeBrokerParse
    
    (cherry picked from commit 908dae990213db9bb6922257a451a548d6ecf1ef)
---
 .../apache/pulsar/broker/web/PulsarWebResource.java | 21 ++++++++++++++-------
 .../pulsar/broker/service/ReplicatorTest.java       | 20 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 677d81e..74176a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -55,6 +55,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
@@ -368,14 +370,19 @@ public abstract class PulsarWebResource {
     }
 
     private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException {
-        URL webUrl = null;
-        if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
-                && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
-            webUrl = new URL(differentClusterData.getServiceUrlTls());
-        } else {
-            webUrl = new URL(differentClusterData.getServiceUrl());
+        try {
+            PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver();
+            if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
+                    && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
+                serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
+            } else {
+                serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
+            }
+            URL webUrl = new URL(serviceNameResolver.resolveHostUri().toString());
+            return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
+        } catch (PulsarClientException.InvalidServiceURL exception) {
+            throw new MalformedURLException(exception.getMessage());
         }
-        return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
     }
 
     protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index b1580d1..41d085b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -69,9 +69,11 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,6 +200,24 @@ public class ReplicatorTest extends ReplicatorTestBase {
         // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
     }
 
+    @Test(timeOut = 10000)
+    public void activeBrokerParse() throws Exception {
+        pulsar1.getConfiguration().setAuthorizationEnabled(true);
+        //init clusterData
+        ClusterData cluster2Data = new ClusterData();
+        String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", pulsar2.getWebServiceAddress());
+        cluster2Data.setServiceUrl(cluster2ServiceUrls);
+        String cluster2 = "activeCLuster2";
+        admin2.clusters().createCluster(cluster2, cluster2Data);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+                -> admin2.clusters().getCluster(cluster2) != null);
+
+        List<String> list = admin1.brokers().getActiveBrokers(cluster2);
+        assertEquals(list.get(0), url2.toString().replace("http://", ""));
+        //restore configuration
+        pulsar1.getConfiguration().setAuthorizationEnabled(false);
+    }
+
     @SuppressWarnings("unchecked")
     @Test(timeOut = 30000)
     public void testConcurrentReplicator() throws Exception {