You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/14 03:19:09 UTC
[pulsar] 03/03: Fix admin-api-brokers list failed (#9191)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 823f1fc81c7884781e9adf8a7ccb1601fb3531dc
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Jan 13 18:27:50 2021 +0800
Fix admin-api-brokers list failed (#9191)
Fixes #9128
### Motivation
There have bug in parses cluster service url. Current address resolution does not support multiple addresses.
Such as `http://host1:8080,host2:8080,host3:8080`
### Modifications
Let URI resolution support multiple addresses
### Verifying this change
unit test:
activeBrokerParse
(cherry picked from commit 908dae990213db9bb6922257a451a548d6ecf1ef)
---
.../apache/pulsar/broker/web/PulsarWebResource.java | 21 ++++++++++++++-------
.../pulsar/broker/service/ReplicatorTest.java | 20 ++++++++++++++++++++
2 files changed, 34 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 677d81e..74176a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -55,6 +55,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
@@ -368,14 +370,19 @@ public abstract class PulsarWebResource {
}
private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException {
- URL webUrl = null;
- if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
- && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
- webUrl = new URL(differentClusterData.getServiceUrlTls());
- } else {
- webUrl = new URL(differentClusterData.getServiceUrl());
+ try {
+ PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver();
+ if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
+ && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
+ serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
+ } else {
+ serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
+ }
+ URL webUrl = new URL(serviceNameResolver.resolveHostUri().toString());
+ return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
+ } catch (PulsarClientException.InvalidServiceURL exception) {
+ throw new MalformedURLException(exception.getMessage());
}
- return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
}
protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index b1580d1..41d085b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -69,9 +69,11 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -198,6 +200,24 @@ public class ReplicatorTest extends ReplicatorTestBase {
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}
+ @Test(timeOut = 10000)
+ public void activeBrokerParse() throws Exception {
+ pulsar1.getConfiguration().setAuthorizationEnabled(true);
+ //init clusterData
+ ClusterData cluster2Data = new ClusterData();
+ String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", pulsar2.getWebServiceAddress());
+ cluster2Data.setServiceUrl(cluster2ServiceUrls);
+ String cluster2 = "activeCLuster2";
+ admin2.clusters().createCluster(cluster2, cluster2Data);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin2.clusters().getCluster(cluster2) != null);
+
+ List<String> list = admin1.brokers().getActiveBrokers(cluster2);
+ assertEquals(list.get(0), url2.toString().replace("http://", ""));
+ //restore configuration
+ pulsar1.getConfiguration().setAuthorizationEnabled(false);
+ }
+
@SuppressWarnings("unchecked")
@Test(timeOut = 30000)
public void testConcurrentReplicator() throws Exception {