You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/02 08:45:20 UTC

[GitHub] merlimat closed pull request #930: Issue #929: Perform async DNS resolution each time attempting connect?

merlimat closed pull request #930: Issue #929: Perform async DNS resolution each time attempting connect?
URL: https://github.com/apache/incubator-pulsar/pull/930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
new file mode 100644
index 000000000..0447b780e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
+
+    String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSingleIpAddress() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
+        ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+        PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);
+
+        List<InetAddress> result = Lists.newArrayList();
+        result.add(InetAddress.getByName("127.0.0.1"));
+        Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
+
+        client.createProducer("persistent://sample/standalone/ns/my-topic");
+
+        client.close();
+    }
+
+    @Test
+    public void testDoubleIpAddress() throws Exception {
+        String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;
+
+        ClientConfiguration conf = new ClientConfiguration();
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
+        ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+        PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);
+
+        List<InetAddress> result = Lists.newArrayList();
+
+        // Add a non existent IP to the response to check that we're trying the 2nd address as well
+        result.add(InetAddress.getByName("127.0.0.99"));
+        result.add(InetAddress.getByName("127.0.0.1"));
+        Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
+
+        // Create producer should succeed by trying the 2nd IP
+        client.createProducer("persistent://sample/standalone/ns/my-topic");
+        client.close();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 9b4e8cd04..8f9801efb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -20,14 +20,18 @@
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -38,6 +42,7 @@
 
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -45,23 +50,11 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.DoubleByteBuf;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
-import org.mockito.cglib.proxy.Enhancer;
-import org.mockito.cglib.proxy.MethodInterceptor;
-import org.mockito.cglib.proxy.MethodProxy;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -70,8 +63,6 @@
 import org.testng.annotations.Test;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.ResourceLeakDetector;
 
 public class MessageIdTest extends BrokerTestBase {
     private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);
@@ -264,24 +255,24 @@ public void partitionedProducerSend() throws PulsarClientException, PulsarAdminE
     /**
      * Verifies: different versions of broker-deployment (one broker understands Checksum and other
      * doesn't in that case remove checksum before sending to broker-2)
-     * 
+     *
      * client first produce message with checksum and then retries to send message due to connection unavailable. But this time, if
      * broker doesn't understand checksum: then client should remove checksum from the message before sending to broker.
-     * 
-     * 1. stop broker 
-     * 2. client compute checksum and add into message 
-     * 3. produce 2 messages and corrupt 1 message 
-     * 4. start broker with lower version (which doesn't support checksum) 
-     * 5. client reconnects to broker and due to incompatibility of version: removes checksum from message 
-     * 6. broker doesn't do checksum validation and persist message 
+     *
+     * 1. stop broker
+     * 2. client compute checksum and add into message
+     * 3. produce 2 messages and corrupt 1 message
+     * 4. start broker with lower version (which doesn't support checksum)
+     * 5. client reconnects to broker and due to incompatibility of version: removes checksum from message
+     * 6. broker doesn't do checksum validation and persist message
      * 7. client receives ack
-     * 
+     *
      * @throws Exception
      */
     @Test
     public void testChecksumVersionComptability() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/topic1";
-        
+
         // 1. producer connect
         ProducerImpl prod = (ProducerImpl) pulsarClient.createProducer(topicName);
         ProducerImpl producer = spy(prod);
@@ -302,7 +293,8 @@ public void testChecksumVersionComptability() throws Exception {
         // mock-value from brokerChecksumSupportedVersion
         ((PulsarClientImpl) pulsarClient).timer().stop();
 
-        ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient));
+        ClientCnx mockClientCnx = spy(
+                new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
         doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
         prod.setClientCnx(mockClientCnx);
 
@@ -366,7 +358,8 @@ public void testChecksumReconnection() throws Exception {
         ((PulsarClientImpl) pulsarClient).timer().stop();
 
         // set clientCnx mock to get non-checksum supported version
-        ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient));
+        ClientCnx mockClientCnx = spy(
+                new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
         doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
         prod.setClientCnx(mockClientCnx);
 
@@ -408,22 +401,22 @@ public void testChecksumReconnection() throws Exception {
         assertEquals(new String(msg.getData()), "message-3");
 
     }
-    
-    
+
+
     /**
      * Verifies: if message is corrupted before sending to broker and if broker gives checksum error: then
      * 1. Client-Producer recomputes checksum with modified data
      * 2. Retry message-send again
-     * 3. Broker verifies checksum 
+     * 3. Broker verifies checksum
      * 4. client receives send-ack success
-     * 
+     *
      * @throws Exception
      */
     @Test
     public void testCorruptMessageRemove() throws Exception {
 
         final String topicName = "persistent://prop/use/ns-abc/retry-topic";
-        
+
         ProducerConfiguration config = new ProducerConfiguration();
         config.setSendTimeout(10, TimeUnit.MINUTES);
         // 1. producer connect
@@ -489,25 +482,26 @@ public void testCorruptMessageRemove() throws Exception {
         assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
 
         assertEquals(producer.getPendingQueueSize(), 0);
-        
+
         // [2] test-recoverChecksumError functionality
         stopBroker();
         MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
         future = producer.sendAsync(msg1);
-        ClientCnx cnx = spy(new ClientCnx((PulsarClientImpl)pulsarClient) {});
+        ClientCnx cnx = spy(
+                new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
         String exc = "broker is already stopped";
         // when client-try to recover checksum by resending to broker: throw exception as broker is stopped
         doThrow(new IllegalStateException(exc)).when(cnx).ctx();
         try {
-            producer.recoverChecksumError(cnx, 1);    
+            producer.recoverChecksumError(cnx, 1);
             fail("it should call : resendMessages() => which should throw above mocked exception");
         }catch(IllegalStateException e) {
             assertEquals(exc, e.getMessage());
         }
-        
+
         producer.close();
         consumer.close();
         producer = null; // clean reference of mocked producer
     }
-   
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 857258077..9f5338d6a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -35,6 +35,7 @@
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.resolver.InetSocketAddressResolver;
 
 public class BinaryProtoLookupService implements LookupService {
 
@@ -49,7 +50,10 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
         URI uri;
         try {
             uri = new URI(serviceUrl);
-            this.serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
+
+            // Don't attempt to resolve the hostname in DNS at this point. It will be done each time when attempting to
+            // connect
+            this.serviceAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
         } catch (Exception e) {
             log.error("Invalid service-url {} provided {}", serviceUrl, e.getMessage(), e);
             throw new PulsarClientException.InvalidServiceURL(e);
@@ -59,7 +63,8 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
     /**
      * Calls broker binaryProto-lookup api to find broker-service address which can serve a given topic.
      *
-     * @param destination: topic-name
+     * @param destination:
+     *            topic-name
      * @return broker-socket-address that serves given topic
      */
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(DestinationName destination) {
@@ -74,7 +79,6 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
         return getPartitionedTopicMetadata(serviceAddress, destination);
     }
 
-
     private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
             boolean authoritative, DestinationName destination) {
         CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
@@ -93,7 +97,7 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
                         uri = new URI(serviceUrl);
                     }
 
-                    InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
+                    InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
 
                     // (2) redirect to given address if response is: redirect
                     if (lookupDataResult.redirect) {
@@ -138,7 +142,6 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
         return addressFuture;
     }
 
-
     private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
             DestinationName destination) {
 
@@ -170,7 +173,7 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
     }
 
     public String getServiceUrl() {
-    	return serviceAddress.toString();
+        return serviceAddress.toString();
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7b3c6294f..2eaa1d459 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -31,6 +31,7 @@
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.common.api.Commands;
@@ -87,14 +88,12 @@
         None, SentConnectFrame, Ready
     }
 
-    public ClientCnx(PulsarClientImpl pulsarClient) {
+    public ClientCnx(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
         super(30, TimeUnit.SECONDS);
-        this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(),
-                true);
-        this.authentication = pulsarClient.getConfiguration().getAuthentication();
-        this.eventLoopGroup = pulsarClient.eventLoopGroup();
-        this.maxNumberOfRejectedRequestPerConnection = pulsarClient.getConfiguration()
-                .getMaxNumberOfRejectedRequestPerConnection();
+        this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
+        this.authentication = conf.getAuthentication();
+        this.eventLoopGroup = eventLoopGroup;
+        this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
         this.state = State.None;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 3319dc2c6..9c465f746 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -21,8 +21,11 @@
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.cert.X509Certificate;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,8 +38,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelException;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -47,6 +53,9 @@
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.util.concurrent.Future;
 
 public class ConnectionPool implements Closeable {
     private final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
@@ -55,12 +64,14 @@
     private final EventLoopGroup eventLoopGroup;
     private final int maxConnectionsPerHosts;
 
+    private final DnsNameResolver dnsResolver;
+
     private static final int MaxMessageSize = 5 * 1024 * 1024;
     public static final String TLS_HANDLER = "tls";
 
-    public ConnectionPool(final PulsarClientImpl client, EventLoopGroup eventLoopGroup) {
+    public ConnectionPool(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
         this.eventLoopGroup = eventLoopGroup;
-        this.maxConnectionsPerHosts = client.getConfiguration().getConnectionsPerBroker();
+        this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
 
         pool = new ConcurrentHashMap<>();
         bootstrap = new Bootstrap();
@@ -68,27 +79,26 @@ public ConnectionPool(final PulsarClientImpl client, EventLoopGroup eventLoopGro
         bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
 
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
-        bootstrap.option(ChannelOption.TCP_NODELAY, client.getConfiguration().isUseTcpNoDelay());
+        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
         bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
             public void initChannel(SocketChannel ch) throws Exception {
-                ClientConfiguration clientConfig = client.getConfiguration();
-                if (clientConfig.isUseTls()) {
+                if (conf.isUseTls()) {
                     SslContextBuilder builder = SslContextBuilder.forClient();
-                    if (clientConfig.isTlsAllowInsecureConnection()) {
+                    if (conf.isTlsAllowInsecureConnection()) {
                         builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
                     } else {
-                        if (clientConfig.getTlsTrustCertsFilePath().isEmpty()) {
+                        if (conf.getTlsTrustCertsFilePath().isEmpty()) {
                             // Use system default
                             builder.trustManager((File) null);
                         } else {
-                            File trustCertCollection = new File(clientConfig.getTlsTrustCertsFilePath());
+                            File trustCertCollection = new File(conf.getTlsTrustCertsFilePath());
                             builder.trustManager(trustCertCollection);
                         }
                     }
 
                     // Set client certificate if available
-                    AuthenticationDataProvider authData = clientConfig.getAuthentication().getAuthData();
+                    AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
                     if (authData.hasDataForTls()) {
                         builder.keyManager(authData.getTlsPrivateKey(),
                                 (X509Certificate[]) authData.getTlsCertificates());
@@ -98,9 +108,12 @@ public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
                 }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
-                ch.pipeline().addLast("handler", new ClientCnx(client));
+                ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
             }
         });
+
+        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
+                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
     }
 
     private static final Random random = new Random();
@@ -114,18 +127,20 @@ public void initChannel(SocketChannel ch) throws Exception {
      * <p>
      * The connection can either be created or be coming from the pool itself.
      * <p>
-     * When specifying multiple addresses, the logicalAddress is used as a tag for the broker,
-     * while the physicalAddress is where the connection is actually happening.
+     * When specifying multiple addresses, the logicalAddress is used as a tag for the broker, while the physicalAddress
+     * is where the connection is actually happening.
      * <p>
-     * These two addresses can be different when the client is forced to connect through
-     * a proxy layer. Essentially, the pool is using the logical address as a way to
-     * decide whether to reuse a particular connection.
+     * These two addresses can be different when the client is forced to connect through a proxy layer. Essentially, the
+     * pool is using the logical address as a way to decide whether to reuse a particular connection.
      *
-     * @param logicalAddress the address to use as the broker tag
-     * @param physicalAddress the real address where the TCP connection should be made
+     * @param logicalAddress
+     *            the address to use as the broker tag
+     * @param physicalAddress
+     *            the real address where the TCP connection should be made
      * @return a future that will produce the ClientCnx object
      */
-    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress) {
+    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress,
+            InetSocketAddress physicalAddress) {
         if (maxConnectionsPerHosts == 0) {
             // Disable pooling
             return createConnection(logicalAddress, physicalAddress, -1);
@@ -146,17 +161,10 @@ public void initChannel(SocketChannel ch) throws Exception {
         final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();
 
         // Trigger async connect to broker
-        bootstrap.connect(physicalAddress).addListener((ChannelFuture future) -> {
-            if (!future.isSuccess()) {
-                log.warn("Failed to open connection to {} : {}", physicalAddress, future.cause().getClass().getSimpleName());
-                cnxFuture.completeExceptionally(new PulsarClientException(future.cause()));
-                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
-                return;
-            }
+        createConnection(physicalAddress).thenAccept(channel -> {
+            log.info("[{}] Connected to server", channel);
 
-            log.info("[{}] Connected to server", future.channel());
-
-            future.channel().closeFuture().addListener(v -> {
+            channel.closeFuture().addListener(v -> {
                 // Remove connection from pool when it gets closed
                 if (log.isDebugEnabled()) {
                     log.debug("Removing closed connection from pool: {}", v);
@@ -166,10 +174,10 @@ public void initChannel(SocketChannel ch) throws Exception {
 
             // We are connected to broker, but need to wait until the connect/connected handshake is
             // complete
-            final ClientCnx cnx = (ClientCnx) future.channel().pipeline().get("handler");
-            if (!future.channel().isActive() || cnx == null) {
+            final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
+            if (!channel.isActive() || cnx == null) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Connection was already closed by the time we got notified", future.channel());
+                    log.debug("[{}] Connection was already closed by the time we got notified", channel);
                 }
                 cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
                 return;
@@ -195,14 +203,92 @@ public void initChannel(SocketChannel ch) throws Exception {
                 cnx.ctx().close();
                 return null;
             });
+        }).exceptionally(exception -> {
+            log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getClass().getSimpleName());
+            cnxFuture.completeExceptionally(new PulsarClientException(exception));
+            cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+            return null;
         });
 
         return cnxFuture;
     }
 
+    /**
+     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
+     */
+    private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
+        String hostname = unresolvedAddress.getHostString();
+        int port = unresolvedAddress.getPort();
+
+        // Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
+        return resolveName(hostname)
+                .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
+    }
+
+    /**
+     * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
+     * working
+     */
+    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
+        CompletableFuture<Channel> future = new CompletableFuture<>();
+
+        connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> {
+            // Successfully connected to server
+            future.complete(channel);
+        }).exceptionally(exception -> {
+            if (unresolvedAddresses.hasNext()) {
+                // Try next IP address
+                connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel -> {
+                    future.complete(channel);
+                }).exceptionally(ex -> {
+                    // This is already unwinding the recursive call
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+            } else {
+                // Failed to connect to any IP address
+                future.completeExceptionally(exception);
+            }
+            return null;
+        });
+
+        return future;
+    }
+
+    @VisibleForTesting
+    CompletableFuture<List<InetAddress>> resolveName(String hostname) {
+        CompletableFuture<List<InetAddress>> future = new CompletableFuture<>();
+        dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>> resolveFuture) -> {
+            if (resolveFuture.isSuccess()) {
+                future.complete(resolveFuture.get());
+            } else {
+                future.completeExceptionally(resolveFuture.cause());
+            }
+        });
+        return future;
+    }
+
+    /**
+     * Attempt to establish a TCP connection to an already resolved single IP address
+     */
+    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port) {
+        CompletableFuture<Channel> future = new CompletableFuture<>();
+
+        bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
+            if (channelFuture.isSuccess()) {
+                future.complete(channelFuture.channel());
+            } else {
+                future.completeExceptionally(channelFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
     @Override
     public void close() throws IOException {
         eventLoopGroup.shutdownGracefully();
+        dnsResolver.close();
     }
 
     private void cleanupConnection(InetSocketAddress address, int connectionKey,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index a22630ee4..8c12ab167 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -70,7 +70,7 @@ public HttpLookupService(String serviceUrl, ClientConfiguration conf, EventLoopG
                     uri = new URI(serviceUrl);
                 }
 
-                InetSocketAddress brokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
+                InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                 return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
             } catch (Exception e) {
                 // Failed to parse url
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 88f0b54e9..9f108211e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -54,8 +56,6 @@
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
-
 public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
@@ -86,13 +86,19 @@ public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws Puls
 
     public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
+        this(serviceUrl, conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+    }
+
+    public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+            ConnectionPool cnxPool)
+            throws PulsarClientException {
         if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) {
             throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
         }
         this.eventLoopGroup = eventLoopGroup;
         this.conf = conf;
         conf.getAuthentication().start();
-        cnxPool = new ConnectionPool(this, eventLoopGroup);
+        this.cnxPool = cnxPool;
         if (serviceUrl.startsWith("http")) {
             lookup = new HttpLookupService(serviceUrl, conf, eventLoopGroup);
         } else {
@@ -439,7 +445,7 @@ public ConnectionPool getCnxPool() {
         return cnxPool;
     }
 
-    EventLoopGroup eventLoopGroup() {
+    public EventLoopGroup eventLoopGroup() {
         return eventLoopGroup;
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index 0a447c705..addacbebd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -24,13 +24,16 @@
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollDatagramChannel;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollMode;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
 import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
@@ -70,6 +73,14 @@ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threa
         }
     }
 
+    public static Class<? extends DatagramChannel> getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
+        if (eventLoopGroup instanceof EpollEventLoopGroup) {
+            return EpollDatagramChannel.class;
+        } else {
+            return NioDatagramChannel.class;
+        }
+    }
+
     public static void enableTriggeredMode(ServerBootstrap bootstrap) {
         if (Epoll.isAvailable()) {
             bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 2400324ed..9dc876fc0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -97,7 +97,7 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
             return;
         }
 
-        InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
+        InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
         if (log.isDebugEnabled()) {
             log.debug("Getting connections to '{}'", addr);
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services