You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/13 01:17:18 UTC

[GitHub] sijie closed pull request #3249: [client] Issue 3218: Support specifying multiple hosts in pulsar service url and web url

sijie closed pull request #3249: [client] Issue 3218: Support specifying multiple hosts in pulsar service url and web url
URL: https://github.com/apache/pulsar/pull/3249
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 73a51adba9..40ae7f8e6b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -30,6 +31,7 @@
 
 import java.util.concurrent.TimeUnit;
 
+@Slf4j
 public class ServiceUrlProviderTest extends ProducerConsumerBase {
 
     @BeforeClass
@@ -106,13 +108,27 @@ public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception
         conf.setWebServicePortTls(PortManager.nextFreePort());
         startBroker();
         PulsarService pulsarService2 = pulsar;
-        System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() + ", Pulsar2=" + pulsarService2.getBrokerServiceUrl());
+
+        log.info("Pulsar1 = {}, Pulsar2 = {}", pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
         Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + producer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
+
+        log.info("Service url : producer = {}, consumer = {}",
+            producer.getClient().getLookup().getServiceUrl(),
+            consumer.getClient().getLookup().getServiceUrl());
+
+        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
+        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService1.getBrokerServiceUrl());
+
+        log.info("Changing service url from {} to {}",
+            pulsarService1.getBrokerServiceUrl(),
+            pulsarService2.getBrokerServiceUrl());
+
         serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + producer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
+        log.info("Service url changed : producer = {}, consumer = {}",
+            producer.getClient().getLookup().getServiceUrl(),
+            consumer.getClient().getLookup().getServiceUrl());
+        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
+        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), pulsarService2.getBrokerServiceUrl());
         producer.close();
         consumer.close();
         client.close();
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 0953d22cc2..95d4fd6093 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -118,8 +118,10 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
         httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
         httpConfig.register(MultiPartFeature.class);
 
-        ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig)
-                .register(JacksonConfigurator.class).register(JacksonFeature.class);
+        ClientBuilder clientBuilder = ClientBuilder.newBuilder()
+            .withConfig(httpConfig)
+            .register(JacksonConfigurator.class)
+            .register(JacksonFeature.class);
 
         boolean useTls = false;
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e373ed8319..94773e3a5c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -51,7 +51,7 @@
 public class BinaryProtoLookupService implements LookupService {
 
     private final PulsarClientImpl client;
-    protected volatile InetSocketAddress serviceAddress;
+    private final ServiceNameResolver serviceNameResolver;
     private final boolean useTls;
     private final ExecutorService executor;
 
@@ -60,22 +60,13 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
+        this.serviceNameResolver = new PulsarServiceNameResolver();
         updateServiceUrl(serviceUrl);
     }
 
     @Override
     public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
-        URI uri;
-        try {
-            uri = new URI(serviceUrl);
-
-            // Don't attempt to resolve the hostname in DNS at this point. It will be done each time when attempting to
-            // connect
-            this.serviceAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-        } catch (Exception e) {
-            log.error("Invalid service-url {} provided {}", serviceUrl, e.getMessage(), e);
-            throw new PulsarClientException.InvalidServiceURL(e);
-        }
+        serviceNameResolver.updateServiceUrl(serviceUrl);
     }
 
     /**
@@ -86,7 +77,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
      * @return broker-socket-address that serves given topic
      */
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
-        return findBroker(serviceAddress, false, topicName);
+        return findBroker(serviceNameResolver.resolveHost(), false, topicName);
     }
 
     /**
@@ -94,7 +85,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
      *
      */
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
-        return getPartitionedTopicMetadata(serviceAddress, topicName);
+        return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
     }
 
     private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
@@ -133,7 +124,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
                         // (3) received correct broker to connect
                         if (lookupDataResult.proxyThroughServiceUrl) {
                             // Connect through proxy
-                            addressFuture.complete(Pair.of(responseBrokerAddress, serviceAddress));
+                            addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
                         } else {
                             // Normal result with direct connection to broker
                             addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
@@ -192,7 +183,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
 
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
-        return client.getCnxPool().getConnection(serviceAddress).thenCompose(clientCnx -> {
+        return client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.empty());
 
@@ -201,7 +192,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
     }
 
     public String getServiceUrl() {
-        return serviceAddress.toString();
+        return serviceNameResolver.getServiceUrl();
     }
 
     @Override
@@ -212,7 +203,7 @@ public String getServiceUrl() {
         Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
             opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
             0 , TimeUnit.MILLISECONDS);
-        getTopicsUnderNamespace(serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture, mode);
+        getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode);
         return topicsFuture;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 0e94e6b73f..43e5bd7c45 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -29,7 +29,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map;
 import java.util.Properties;
@@ -60,7 +59,7 @@
     protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     protected final AsyncHttpClient httpClient;
-    protected volatile URL url;
+    protected final ServiceNameResolver serviceNameResolver;
     protected final Authentication authentication;
 
     protected HttpClient(String serviceUrl, Authentication authentication,
@@ -74,7 +73,8 @@ protected HttpClient(String serviceUrl, Authentication authentication,
             EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
             int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException {
         this.authentication = authentication;
-        setServiceUrl(serviceUrl);
+        this.serviceNameResolver = new PulsarServiceNameResolver();
+        this.serviceNameResolver.updateServiceUrl(serviceUrl);
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
@@ -89,7 +89,7 @@ public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse r
             }
         });
 
-        if ("https".equals(url.getProtocol())) {
+        if ("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
             try {
                 SslContext sslCtx = null;
 
@@ -112,16 +112,15 @@ public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse r
         AsyncHttpClientConfig config = confBuilder.build();
         httpClient = new DefaultAsyncHttpClient(config);
 
-        log.debug("Using HTTP url: {}", this.url);
+        log.debug("Using HTTP url: {}", serviceUrl);
+    }
+
+    String getServiceUrl() {
+        return this.serviceNameResolver.getServiceUrl();
     }
 
     void setServiceUrl(String serviceUrl) throws PulsarClientException {
-        try {
-            // Ensure trailing "/" on url
-            url = new URL(serviceUrl);
-        } catch (MalformedURLException e) {
-            throw new PulsarClientException.InvalidServiceURL(e);
-        }
+        this.serviceNameResolver.updateServiceUrl(serviceUrl);
     }
 
     @Override
@@ -132,7 +131,7 @@ public void close() throws IOException {
     public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
         final CompletableFuture<T> future = new CompletableFuture<>();
         try {
-            String requestUrl = new URL(url, path).toString();
+            String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
             AuthenticationDataProvider authData = authentication.getAuthData();
             BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 97ef0e1c6c..1bbf258189 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -104,7 +104,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
     }
 
     public String getServiceUrl() {
-    	return httpClient.url.toString();
+    	return httpClient.getServiceUrl();
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
new file mode 100644
index 0000000000..3d984897fe
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.util.internal.PlatformDependent;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+
+/**
+ * The default implementation of {@link ServiceNameResolver}.
+ */
+@Slf4j
+class PulsarServiceNameResolver implements ServiceNameResolver {
+
+    private volatile ServiceURI serviceUri;
+    private volatile String serviceUrl;
+    private volatile List<InetSocketAddress> addressList;
+
+    @Override
+    public InetSocketAddress resolveHost() {
+        List<InetSocketAddress> list = addressList;
+        checkState(
+            list != null, "No service url is provided yet");
+        checkState(
+            !list.isEmpty(), "No hosts found for service url : " + serviceUrl);
+        if (list.size() == 1) {
+            return list.get(0);
+        } else {
+            return list.get(randomIndex(list.size()));
+        }
+    }
+
+    @Override
+    public URI resolveHostUri() {
+        InetSocketAddress host = resolveHost();
+        String hostUrl = serviceUri.getServiceScheme() + "://" + host.getHostName() + ":" + host.getPort();
+        return URI.create(hostUrl);
+    }
+
+    @Override
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    @Override
+    public ServiceURI getServiceUri() {
+        return serviceUri;
+    }
+
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws InvalidServiceURL {
+        ServiceURI uri;
+        try {
+            uri = ServiceURI.create(serviceUrl);
+        } catch (IllegalArgumentException iae) {
+            log.error("Invalid service-url {} provided {}", serviceUrl, iae.getMessage(), iae);
+            throw new InvalidServiceURL(iae);
+        }
+
+        String[] hosts = uri.getServiceHosts();
+        List<InetSocketAddress> addresses = new ArrayList<>(hosts.length);
+        for (String host : hosts) {
+            String hostUrl = uri.getServiceScheme() + "://" + host;
+            try {
+                URI hostUri = new URI(hostUrl);
+                addresses.add(InetSocketAddress.createUnresolved(hostUri.getHost(), hostUri.getPort()));
+            } catch (URISyntaxException e) {
+                log.error("Invalid host provided {}", hostUrl, e.getMessage(), e);
+                throw new InvalidServiceURL(e);
+            }
+        }
+        this.addressList = addresses;
+        this.serviceUrl = serviceUrl;
+        this.serviceUri = uri;
+    }
+
+    private static int randomIndex(int numAddresses) {
+        return numAddresses == 1 ? 0 : PlatformDependent.threadLocalRandom().nextInt(numAddresses);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
new file mode 100644
index 0000000000..4264b6101e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+
+/**
+ * A service name resolver to resolve real socket address.
+ */
+public interface ServiceNameResolver {
+
+    /**
+     * Resolve pulsar service url.
+     *
+     * @return resolve the service url to return a socket address
+     */
+    InetSocketAddress resolveHost();
+
+    /**
+     * Resolve pulsar service url
+     * @return
+     */
+    URI resolveHostUri();
+
+    /**
+     * Get service url.
+     *
+     * @return service url
+     */
+    String getServiceUrl();
+
+    /**
+     * Get service uri.
+     *
+     * @return service uri
+     */
+    ServiceURI getServiceUri();
+
+    /**
+     * Update service url.
+     *
+     * @param serviceUrl service url
+     */
+    void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
new file mode 100644
index 0000000000..878e240a1a
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link PulsarServiceNameResolver}.
+ */
+public class PulsarServiceNameResolverTest {
+
+    private PulsarServiceNameResolver resolver;
+
+    @BeforeMethod
+    public void setup() {
+        this.resolver = new PulsarServiceNameResolver();
+        assertNull(resolver.getServiceUrl());
+        assertNull(resolver.getServiceUri());
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testResolveBeforeUpdateServiceUrl() {
+        resolver.resolveHost();
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testResolveUrlBeforeUpdateServiceUrl() throws Exception {
+        resolver.resolveHostUri();
+    }
+
+    @Test
+    public void testUpdateInvalidServiceUrl() {
+        String serviceUrl = "pulsar:///";
+        try {
+            resolver.updateServiceUrl(serviceUrl);
+            fail("Should fail to update service url if service url is invalid");
+        } catch (InvalidServiceURL isu) {
+            // expected
+        }
+        assertNull(resolver.getServiceUrl());
+        assertNull(resolver.getServiceUri());
+    }
+
+    @Test
+    public void testSimpleHostUrl() throws Exception {
+        String serviceUrl = "pulsar://host1:6650";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+        InetSocketAddress expectedAddress = InetSocketAddress.createUnresolved("host1", 6650);
+        assertEquals(expectedAddress, resolver.resolveHost());
+        assertEquals(URI.create(serviceUrl), resolver.resolveHostUri());
+
+        String newServiceUrl = "pulsar://host2:6650";
+        resolver.updateServiceUrl(newServiceUrl);
+        assertEquals(newServiceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(newServiceUrl), resolver.getServiceUri());
+
+        InetSocketAddress newExpectedAddress = InetSocketAddress.createUnresolved("host2", 6650);
+        assertEquals(newExpectedAddress, resolver.resolveHost());
+        assertEquals(URI.create(newServiceUrl), resolver.resolveHostUri());
+    }
+
+    @Test
+    public void testMultipleHostsUrl() throws Exception {
+        String serviceUrl = "pulsar://host1:6650,host2:6650";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+        Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+        Set<URI> expectedHostUrls = new HashSet<>();
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host1", 6650));
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host2", 6650));
+        expectedHostUrls.add(URI.create("pulsar://host1:6650"));
+        expectedHostUrls.add(URI.create("pulsar://host2:6650"));
+
+        for (int i = 0; i < 10; i++) {
+            assertTrue(expectedAddresses.contains(resolver.resolveHost()));
+            assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
+        }
+    }
+
+    @Test
+    public void testMultipleHostsTlsUrl() throws Exception {
+        String serviceUrl = "pulsar+ssl://host1:6651,host2:6651";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+        Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+        Set<URI> expectedHostUrls = new HashSet<>();
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host1", 6651));
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host2", 6651));
+        expectedHostUrls.add(URI.create("pulsar+ssl://host1:6651"));
+        expectedHostUrls.add(URI.create("pulsar+ssl://host2:6651"));
+
+        for (int i = 0; i < 10; i++) {
+            assertTrue(expectedAddresses.contains(resolver.resolveHost()));
+            assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
+        }
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
new file mode 100644
index 0000000000..f562b7d883
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.net;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * ServiceURI represents service uri within pulsar cluster.
+ *
+ * <p>This file is based on
+ * {@link https://github.com/apache/bookkeeper/blob/master/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java}
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@EqualsAndHashCode
+public class ServiceURI {
+
+    private static final String BINARY_SERVICE = "pulsar";
+    private static final String HTTP_SERVICE = "http";
+    private static final String HTTPS_SERVICE = "https";
+    private static final String SSL_SERVICE = "ssl";
+
+    private static final int BINARY_PORT = 6650;
+    private static final int BINARY_TLS_PORT = 6651;
+    private static final int HTTP_PORT = 8080;
+    private static final int HTTPS_PORT = 8443;
+
+    /**
+     * Create a service uri instance from a uri string.
+     *
+     * @param uriStr service uri string
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates RFC&nbsp;2396
+     */
+    public static ServiceURI create(String uriStr) {
+        checkNotNull(uriStr, "service uri string is null");
+
+        // a service uri first should be a valid java.net.URI
+        URI uri = URI.create(uriStr);
+
+        return create(uri);
+    }
+
+    /**
+     * Create a service uri instance from a {@link URI} instance.
+     *
+     * @param uri {@link URI} instance
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates RFC&nbsp;2396
+     */
+    public static ServiceURI create(URI uri) {
+        checkNotNull(uri, "service uri instance is null");
+
+        String serviceName;
+        final String[] serviceInfos;
+        String scheme = uri.getScheme();
+        if (null != scheme) {
+            scheme = scheme.toLowerCase();
+            final String serviceSep = "+";
+            String[] schemeParts = StringUtils.split(scheme, serviceSep);
+            serviceName = schemeParts[0];
+            serviceInfos = new String[schemeParts.length - 1];
+            System.arraycopy(schemeParts, 1, serviceInfos, 0, serviceInfos.length);
+        } else {
+            serviceName = null;
+            serviceInfos = new String[0];
+        }
+
+        String userAndHostInformation = uri.getAuthority();
+        checkArgument(!Strings.isNullOrEmpty(userAndHostInformation),
+            "authority component is missing in service uri : " + uri);
+
+        String serviceUser;
+        List<String> serviceHosts;
+        int atIndex = userAndHostInformation.indexOf('@');
+        Splitter splitter = Splitter.on(CharMatcher.anyOf(",;"));
+        if (atIndex > 0) {
+            serviceUser = userAndHostInformation.substring(0, atIndex);
+            serviceHosts = splitter.splitToList(userAndHostInformation.substring(atIndex + 1));
+        } else {
+            serviceUser = null;
+            serviceHosts = splitter.splitToList(userAndHostInformation);
+        }
+        serviceHosts = serviceHosts
+            .stream()
+            .map(host -> validateHostName(serviceName, serviceInfos, host))
+            .collect(Collectors.toList());
+
+        String servicePath = uri.getPath();
+        checkArgument(null != servicePath,
+            "service path component is missing in service uri : " + uri);
+
+        return new ServiceURI(
+            serviceName,
+            serviceInfos,
+            serviceUser,
+            serviceHosts.toArray(new String[serviceHosts.size()]),
+            servicePath,
+            uri);
+    }
+
+    private static String validateHostName(String serviceName,
+                                           String[] serviceInfos,
+                                           String hostname) {
+        String[] parts = hostname.split(":");
+        if (parts.length >= 3) {
+            throw new IllegalArgumentException("Invalid hostname : " + hostname);
+        } else if (parts.length == 2) {
+            try {
+                Integer.parseUnsignedInt(parts[1]);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalArgumentException("Invalid hostname : " + hostname);
+            }
+            return hostname;
+        } else if (parts.length == 1) {
+            return hostname + ":" + getServicePort(serviceName, serviceInfos);
+        } else {
+            return hostname;
+        }
+    }
+
+    private final String serviceName;
+    private final String[] serviceInfos;
+    private final String serviceUser;
+    private final String[] serviceHosts;
+    private final String servicePath;
+    private final URI uri;
+
+    public String[] getServiceInfos() {
+        return serviceInfos;
+    }
+
+    public String[] getServiceHosts() {
+        return serviceHosts;
+    }
+
+    public String getServiceScheme() {
+        if (null == serviceName) {
+            return null;
+        } else {
+            if (serviceInfos.length == 0) {
+                return serviceName;
+            } else {
+                return serviceName + "+" + StringUtils.join(serviceInfos, '+');
+            }
+        }
+    }
+
+    private static int getServicePort(String serviceName, String[] serviceInfos) {
+        int port;
+        switch (serviceName.toLowerCase()) {
+            case BINARY_SERVICE:
+                if (serviceInfos.length == 0) {
+                    port = BINARY_PORT;
+                } else if (serviceInfos.length == 1 && serviceInfos[0].toLowerCase().equals(SSL_SERVICE)) {
+                    port = BINARY_TLS_PORT;
+                } else {
+                    throw new IllegalArgumentException("Invalid pulsar service : " + serviceName + "+" + serviceInfos);
+                }
+                break;
+            case HTTP_SERVICE:
+                port = HTTP_PORT;
+                break;
+            case HTTPS_SERVICE:
+                port = HTTPS_PORT;
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid pulsar service : " + serviceName);
+        }
+        return port;
+    }
+
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
new file mode 100644
index 0000000000..99921c685e
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.net;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import java.net.URI;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link ServiceURI}.
+ */
+public class ServiceURITest {
+
+    private static void assertServiceUri(
+        String serviceUri,
+        String expectedServiceName,
+        String[] expectedServiceInfo,
+        String expectedServiceUser,
+        String[] expectedServiceHosts,
+        String expectedServicePath) {
+
+        ServiceURI serviceURI = ServiceURI.create(serviceUri);
+
+        assertEquals(expectedServiceName, serviceURI.getServiceName());
+        assertArrayEquals(expectedServiceInfo, serviceURI.getServiceInfos());
+        assertEquals(expectedServiceUser, serviceURI.getServiceUser());
+        assertArrayEquals(expectedServiceHosts, serviceURI.getServiceHosts());
+        assertEquals(expectedServicePath, serviceURI.getServicePath());
+    }
+
+    @Test
+    public void testInvalidServiceUris() {
+        String[] uris = new String[] {
+            "://localhost:6650",                // missing scheme
+            "pulsar:///",                       // missing authority
+            "pulsar://localhost:6650:6651/",    // invalid hostname pair
+            "pulsar://localhost:xyz/",          // invalid port
+            "pulsar://localhost:-6650/",        // negative port
+        };
+
+        for (String uri : uris) {
+            testInvalidServiceUri(uri);
+        }
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullServiceUriString() {
+        ServiceURI.create((String) null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullServiceUriInstance() {
+        ServiceURI.create((URI) null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testEmptyServiceUriString() {
+        ServiceURI.create("");
+    }
+
+    private void testInvalidServiceUri(String serviceUri) {
+        try {
+            ServiceURI.create(serviceUri);
+            fail("Should fail to parse service uri : " + serviceUri);
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMissingServiceName() {
+        String serviceUri = "//localhost:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            null, new String[0], null, new String[] { "localhost:6650" }, "/path/to/namespace");
+    }
+
+    @Test
+    public void testEmptyPath() {
+        String serviceUri = "pulsar://localhost:6650";
+        assertServiceUri(
+            serviceUri,
+            "pulsar", new String[0], null, new String[] { "localhost:6650" }, "");
+    }
+
+    @Test
+    public void testRootPath() {
+        String serviceUri = "pulsar://localhost:6650/";
+        assertServiceUri(
+            serviceUri,
+            "pulsar", new String[0], null, new String[] { "localhost:6650" }, "/");
+    }
+
+    @Test
+    public void testUserInfo() {
+        String serviceUri = "pulsar://pulsaruser@localhost:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            "pulsaruser",
+            new String[] { "localhost:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsSemiColon() {
+        String serviceUri = "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsComma() {
+        String serviceUri = "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutPulsarPorts() {
+        String serviceUri = "pulsar://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutPulsarTlsPorts() {
+        String serviceUri = "pulsar+ssl://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[] { "ssl" },
+            null,
+            new String[] { "host1:6651", "host2:6651", "host3:6651" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutHttpPorts() {
+        String serviceUri = "http://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "http",
+            new String[0],
+            null,
+            new String[] { "host1:8080", "host2:8080", "host3:8080" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutHttpsPorts() {
+        String serviceUri = "https://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "https",
+            new String[0],
+            null,
+            new String[] { "host1:8443", "host2:8443", "host3:8443" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsMixedPorts() {
+        String serviceUri = "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6640", "host2:6650", "host3:6660" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsMixed() {
+        String serviceUri = "pulsar://host1:6640,host2,host3:6660/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6640", "host2:6650", "host3:6660" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testUserInfoWithMultipleHosts() {
+        String serviceUri = "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            "pulsaruser",
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services