You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/11 17:15:00 UTC

[jira] [Commented] (KAFKA-6863) Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails

    [ https://issues.apache.org/jira/browse/KAFKA-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646780#comment-16646780 ] 

ASF GitHub Bot commented on KAFKA-6863:
---------------------------------------

rajinisivaram closed pull request #4987: KAFKA-6863 Kafka clients should try to use multiple DNS resolved IP
URL: https://github.com/apache/kafka/pull/4987
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index fba220c8266..1661ea39695 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -27,8 +28,11 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -88,4 +92,27 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config) {
         return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
                 clientSaslMechanism, true);
     }
+
+    static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+        InetAddress[] addresses = InetAddress.getAllByName(host);
+        if (ClientDnsLookup.USE_ALL_DNS_IPS == clientDnsLookup) {
+            return filterPreferredAddresses(addresses);
+        } else {
+            return Collections.singletonList(addresses[0]);
+        }
+    }
+
+    static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
+        List<InetAddress> preferredAddresses = new ArrayList<>();
+        Class<? extends InetAddress> clazz = null;
+        for (InetAddress address : allAddresses) {
+            if (clazz == null) {
+                clazz = address.getClass();
+            }
+            if (clazz.isInstance(address)) {
+                preferredAddresses.add(address);
+            }
+        }
+        return preferredAddresses;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index c3a2856f4ad..b697de73c85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -18,9 +18,13 @@
 
 import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -104,18 +108,24 @@ public boolean isConnecting(String id) {
      * Enter the connecting state for the given connection.
      * @param id the id of the connection
      * @param now the current time
+     * @throws UnknownHostException 
      */
-    public void connecting(String id, long now) {
+    public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
         if (nodeState.containsKey(id)) {
-            NodeConnectionState node = nodeState.get(id);
-            node.lastConnectAttemptMs = now;
-            node.state = ConnectionState.CONNECTING;
+            NodeConnectionState connectionState = nodeState.get(id);
+            connectionState.lastConnectAttemptMs = now;
+            connectionState.state = ConnectionState.CONNECTING;
+            connectionState.moveToNextAddress();
         } else {
             nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
-                this.reconnectBackoffInitMs));
+                this.reconnectBackoffInitMs, ClientUtils.resolve(host, clientDnsLookup)));
         }
     }
 
+    public InetAddress currentAddress(String id) {
+        return nodeState.get(id).currentAddress();
+    }
+
     /**
      * Enter the disconnected state for the given node.
      * @param id the connection we have disconnected
@@ -334,9 +344,13 @@ private NodeConnectionState nodeState(String id) {
         long reconnectBackoffMs;
         // Connection is being throttled if current time < throttleUntilTimeMs.
         long throttleUntilTimeMs;
+        private final List<InetAddress> addresses;
+        private int index = 0;
 
-        public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs) {
+        public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, 
+                List<InetAddress> addresses) {
             this.state = state;
+            this.addresses = addresses;
             this.authenticationException = null;
             this.lastConnectAttemptMs = lastConnectAttempt;
             this.failedAttempts = 0;
@@ -344,6 +358,17 @@ public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long
             this.throttleUntilTimeMs = 0;
         }
 
+        public InetAddress currentAddress() {
+            return addresses.get(index);
+        }
+
+        /*
+         * implementing a ring buffer with the addresses
+         */
+        public void moveToNextAddress() {
+            index = (index + 1) % addresses.size();
+        }
+
         public String toString() {
             return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index a08e5b10b3b..b179f0272e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -93,6 +93,10 @@
                                                          + "elapses the client will resend the request if necessary or fail the request if "
                                                          + "retries are exhausted.";
 
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
+    public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+                                                        + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>";
+
     /**
      * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
      * is explicitly configured but the maximum reconnect backoff is not explicitly configured.
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8ec51ed309d..7ea05f69f3b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
@@ -44,6 +45,7 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -96,6 +98,8 @@
     /* time in ms to wait before retrying to create connection to a server */
     private final long reconnectBackoffMs;
 
+    private final ClientDnsLookup clientDnsLookup;
+
     private final Time time;
 
     /**
@@ -120,6 +124,7 @@ public NetworkClient(Selectable selector,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int defaultRequestTimeoutMs,
+                         ClientDnsLookup clientDnsLookup,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
@@ -134,6 +139,7 @@ public NetworkClient(Selectable selector,
              socketSendBuffer,
              socketReceiveBuffer,
              defaultRequestTimeoutMs,
+             clientDnsLookup,
              time,
              discoverBrokerVersions,
              apiVersions,
@@ -150,6 +156,7 @@ public NetworkClient(Selectable selector,
             int socketSendBuffer,
             int socketReceiveBuffer,
             int defaultRequestTimeoutMs,
+            ClientDnsLookup clientDnsLookup,
             Time time,
             boolean discoverBrokerVersions,
             ApiVersions apiVersions,
@@ -165,6 +172,7 @@ public NetworkClient(Selectable selector,
              socketSendBuffer,
              socketReceiveBuffer,
              defaultRequestTimeoutMs,
+             clientDnsLookup,
              time,
              discoverBrokerVersions,
              apiVersions,
@@ -181,6 +189,7 @@ public NetworkClient(Selectable selector,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int defaultRequestTimeoutMs,
+                         ClientDnsLookup clientDnsLookup,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
@@ -195,6 +204,7 @@ public NetworkClient(Selectable selector,
              socketSendBuffer,
              socketReceiveBuffer,
              defaultRequestTimeoutMs,
+             clientDnsLookup,
              time,
              discoverBrokerVersions,
              apiVersions,
@@ -212,6 +222,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
                           int socketSendBuffer,
                           int socketReceiveBuffer,
                           int defaultRequestTimeoutMs,
+                          ClientDnsLookup clientDnsLookup,
                           Time time,
                           boolean discoverBrokerVersions,
                           ApiVersions apiVersions,
@@ -243,6 +254,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
         this.apiVersions = apiVersions;
         this.throttleTimeSensor = throttleTimeSensor;
         this.log = logContext.logger(NetworkClient.class);
+        this.clientDnsLookup = clientDnsLookup;
     }
 
     /**
@@ -862,12 +874,13 @@ private static void correlate(RequestHeader requestHeader, ResponseHeader respon
     private void initiateConnect(Node node, long now) {
         String nodeConnectionId = node.idString();
         try {
-            log.debug("Initiating connection to node {}", node);
-            this.connectionStates.connecting(nodeConnectionId, now);
+            this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
+            InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
+            log.debug("Initiating connection to node {} using address {}", node, address);
             selector.connect(nodeConnectionId,
-                             new InetSocketAddress(node.host(), node.port()),
-                             this.socketSendBuffer,
-                             this.socketReceiveBuffer);
+                    new InetSocketAddress(address, node.port()),
+                    this.socketSendBuffer,
+                    this.socketReceiveBuffer);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(nodeConnectionId, now);
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 058c491672a..a051de29fb7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -19,6 +19,7 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -158,6 +159,12 @@
                                         in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
                                         Importance.LOW,
                                         METRICS_RECORDING_LEVEL_DOC)
+                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ceebc58301c..86e14476625 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -45,6 +46,7 @@
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -360,6 +362,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
                 config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                 (int) TimeUnit.HOURS.toMillis(1),
+                ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ddd6e06c713..a1c9dc250aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -452,6 +453,12 @@
                                         in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
                                         Importance.MEDIUM,
                                         ISOLATION_LEVEL_DOC)
+                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 04b8ec21591..e79ff07cff0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -35,6 +36,7 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -730,6 +732,7 @@ private KafkaConsumer(ConsumerConfig config,
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
                     new ApiVersions(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 316b024e996..465e60f449c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -48,6 +49,7 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -447,6 +449,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metada
                 producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                 producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                 requestTimeoutMs,
+                ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6142519c4dc..f08159d2e37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -346,7 +347,13 @@
                                         null,
                                         new ConfigDef.NonEmptyString(),
                                         Importance.LOW,
-                                        TRANSACTIONAL_ID_DOC);
+                                        TRANSACTIONAL_ID_DOC)
+                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java b/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
new file mode 100644
index 00000000000..4a013b96ff8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.Locale;
+
+public enum ClientDnsLookup {
+
+    DEFAULT("default"),
+    USE_ALL_DNS_IPS("use_all_dns_ips");
+
+    private String clientDnsLookup;
+
+    ClientDnsLookup(String clientDnsLookup) {
+        this.clientDnsLookup = clientDnsLookup;
+    }
+
+    @Override
+    public String toString() {
+        return clientDnsLookup;
+    }
+
+    public static ClientDnsLookup forConfig(String config) {
+        return ClientDnsLookup.valueOf(config.toUpperCase(Locale.ROOT));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index d19b0be4be4..bea464f14a5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -49,6 +54,39 @@ public void testOnlyBadHostname() {
         check("some.invalid.hostname.foo.bar.local:9999");
     }
 
+    @Test
+    public void testFilterPreferredAddresses() throws UnknownHostException {
+        InetAddress ipv4 = InetAddress.getByName("192.0.0.1");
+        InetAddress ipv6 = InetAddress.getByName("::1");
+
+        InetAddress[] ipv4First = new InetAddress[]{ipv4, ipv6, ipv4};
+        List<InetAddress> result = ClientUtils.filterPreferredAddresses(ipv4First);
+        assertTrue(result.contains(ipv4));
+        assertFalse(result.contains(ipv6));
+        assertEquals(2, result.size());
+
+        InetAddress[] ipv6First = new InetAddress[]{ipv6, ipv4, ipv4};
+        result = ClientUtils.filterPreferredAddresses(ipv6First);
+        assertTrue(result.contains(ipv6));
+        assertFalse(result.contains(ipv4));
+        assertEquals(1, result.size());
+    }
+
+    @Test(expected = UnknownHostException.class)
+    public void testResolveUnknownHostException() throws UnknownHostException {
+        ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.DEFAULT);
+    }
+
+    @Test
+    public void testResolveDnsLookup() throws UnknownHostException {
+        assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.DEFAULT).size());
+    }
+
+    @Test
+    public void testResolveDnsLookupAllIps() throws UnknownHostException {
+        assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
+    }
+
     private List<InetSocketAddress> check(String... url) {
         return ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 37155ce8ef8..d4a2a55dd4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -19,9 +19,15 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Before;
@@ -35,6 +41,7 @@
     private final double reconnectBackoffJitter = 0.2;
     private final String nodeId1 = "1001";
     private final String nodeId2 = "2002";
+    private final String hostTwoIps = "kafka.apache.org";
 
     private ClusterConnectionStates connectionStates;
 
@@ -44,11 +51,11 @@ public void setup() {
     }
 
     @Test
-    public void testClusterConnectionStateChanges() {
+    public void testClusterConnectionStateChanges() throws UnknownHostException {
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
 
         // Start connecting to Node and check state
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING);
         assertTrue(connectionStates.isConnecting(nodeId1));
         assertFalse(connectionStates.isReady(nodeId1, time.milliseconds()));
@@ -88,7 +95,7 @@ public void testClusterConnectionStateChanges() {
     }
 
     @Test
-    public void testMultipleNodeConnectionStates() {
+    public void testMultipleNodeConnectionStates() throws UnknownHostException {
         // Check initial state, allowed to connect to all nodes, but no nodes shown as ready
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
         assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds()));
@@ -96,7 +103,7 @@ public void testMultipleNodeConnectionStates() {
 
         // Start connecting one node and check that the pool only shows ready nodes after
         // successful connect
-        connectionStates.connecting(nodeId2, time.milliseconds());
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
         time.sleep(1000);
         connectionStates.ready(nodeId2);
@@ -104,7 +111,7 @@ public void testMultipleNodeConnectionStates() {
 
         // Connect second node and check that both are shown as ready, pool should immediately
         // show ready nodes, since node2 is already connected
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
         time.sleep(1000);
         connectionStates.ready(nodeId1);
@@ -126,9 +133,9 @@ public void testMultipleNodeConnectionStates() {
     }
 
     @Test
-    public void testAuthorizationFailed() {
+    public void testAuthorizationFailed() throws UnknownHostException {
         // Try connecting
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
 
         time.sleep(100);
 
@@ -147,8 +154,8 @@ public void testAuthorizationFailed() {
     }
 
     @Test
-    public void testRemoveNode() {
-        connectionStates.connecting(nodeId1, time.milliseconds());
+    public void testRemoveNode() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.ready(nodeId1);
         time.sleep(10000);
@@ -162,9 +169,9 @@ public void testRemoveNode() {
     }
 
     @Test
-    public void testMaxReconnectBackoff() {
+    public void testMaxReconnectBackoff() throws UnknownHostException {
         long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter));
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.disconnected(nodeId1, time.milliseconds());
 
@@ -175,14 +182,14 @@ public void testMaxReconnectBackoff() {
             assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
             time.sleep(reconnectBackoff + 1);
             assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
-            connectionStates.connecting(nodeId1, time.milliseconds());
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
             time.sleep(10);
             connectionStates.disconnected(nodeId1, time.milliseconds());
         }
     }
 
     @Test
-    public void testExponentialReconnectBackoff() {
+    public void testExponentialReconnectBackoff() throws UnknownHostException {
         // Calculate fixed components for backoff process
         final int reconnectBackoffExpBase = 2;
         double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
@@ -190,7 +197,7 @@ public void testExponentialReconnectBackoff() {
 
         // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
         for (int i = 0; i < 10; i++) {
-            connectionStates.connecting(nodeId1, time.milliseconds());
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
             connectionStates.disconnected(nodeId1, time.milliseconds());
             // Calculate expected backoff value without jitter
             long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
@@ -202,8 +209,8 @@ public void testExponentialReconnectBackoff() {
     }
 
     @Test
-    public void testThrottled() {
-        connectionStates.connecting(nodeId1, time.milliseconds());
+    public void testThrottled() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.ready(nodeId1);
         time.sleep(10000);
@@ -226,4 +233,47 @@ public void testThrottled() {
         assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()),
             connectionStates.pollDelayMs(nodeId1, time.milliseconds()));
     }
+
+    @Test
+    public void testSingleIPWithDefault() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        InetAddress currAddress = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        assertSame(currAddress, connectionStates.currentAddress(nodeId1));
+    }
+
+    @Test
+    public void testSingleIPWithUseAll() throws UnknownHostException {
+        assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress currAddress = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS);
+        assertSame(currAddress, connectionStates.currentAddress(nodeId1));
+    }
+
+    @Test
+    public void testMultipleIPsWithDefault() throws UnknownHostException {
+        assertEquals(2, ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
+        InetAddress currAddress = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
+        assertSame(currAddress, connectionStates.currentAddress(nodeId1));
+    }
+
+    @Test
+    public void testMultipleIPsWithUseAll() throws UnknownHostException {
+        assertEquals(2, ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress addr1 = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress addr2 = connectionStates.currentAddress(nodeId1);
+        assertNotSame(addr1, addr2);
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress addr3 = connectionStates.currentAddress(nodeId1);
+        assertSame(addr1, addr3);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 2876570b2bf..aaf827feee3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
@@ -70,19 +71,20 @@
     private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
-                defaultRequestTimeoutMs, time, true, new ApiVersions(), new LogContext());
+                defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
                 "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
-                time, true, new ApiVersions(), new LogContext());
+                ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
-                64 * 1024, 64 * 1024, defaultRequestTimeoutMs, time, false, new ApiVersions(), new LogContext());
+                64 * 1024, 64 * 1024, defaultRequestTimeoutMs, 
+                ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
     }
 
     @Before
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 42f6beb16c2..42deee7c204 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -1547,7 +1548,7 @@ public void testQuotaMetrics() {
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                1000, 1000, 64 * 1024, 64 * 1024, 1000,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.DEFAULT,
                 time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f2a34f6c89c..6d4d78cb70f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
@@ -262,7 +263,7 @@ public void testQuotaMetrics() throws Exception {
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                1000, 1000, 64 * 1024, 64 * 1024, 1000,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.DEFAULT,
                 time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c5baaa427cb..aac9fb24441 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -277,7 +278,13 @@ protected static ConfigDef baseConfigDef() {
                         Collections.emptyList(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
-                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                        Type.STRING,
+                        ClientDnsLookup.DEFAULT.toString(),
+                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                        Importance.MEDIUM,
+                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);    
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index aeed060d83e..de8e8b27b74 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -109,6 +110,7 @@ public WorkerGroupMember(DistributedConfig config,
                     config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
                     new ApiVersions(),
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 239844d2f59..5876b6ec460 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
   * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -452,6 +453,7 @@ object AdminClient {
       DefaultSendBufferBytes,
       DefaultReceiveBufferBytes,
       requestTimeoutMs,
+      ClientDnsLookup.DEFAULT,
       time,
       true,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecf6fbf33f1..86b6f94bdd3 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.collection.{Set, mutable}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 
 object ControllerChannelManager {
@@ -140,6 +141,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         config.requestTimeoutMs,
+        ClientDnsLookup.DEFAULT,
         time,
         false,
         new ApiVersions,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index f8b56e8e008..d0e765c5a41 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -36,6 +36,7 @@ import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQue
 
 import collection.JavaConverters._
 import scala.collection.{concurrent, immutable}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,
@@ -74,6 +75,7 @@ object TransactionMarkerChannelManager {
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       config.socketReceiveBufferBytes,
       config.requestTimeoutMs,
+      ClientDnsLookup.DEFAULT,
       time,
       false,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d13d9e0730..841ea8272fa 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -51,6 +51,7 @@ import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, mutable}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -443,6 +444,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,
+          ClientDnsLookup.DEFAULT,
           time,
           false,
           new ApiVersions,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 4c7adfbe9cd..d15fdae6b97 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.AbstractRequest.Builder
 
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.config.ClientDnsLookup
 
 trait BlockingSend {
 
@@ -79,6 +80,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       brokerConfig.replicaSocketReceiveBufferBytes,
       brokerConfig.requestTimeoutMs,
+      ClientDnsLookup.DEFAULT,
       time,
       false,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 4758764be65..7871015f3fb 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
  * For verifying the consistency among replicas.
@@ -472,6 +473,7 @@ private class ReplicaFetcherBlockingSend(sourceNode: Node,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
       consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+      ClientDnsLookup.DEFAULT,
       time,
       false,
       new ApiVersions,
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 82d1d6c49ac..74ab234cdbe 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -21,6 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
@@ -29,6 +30,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -180,6 +182,7 @@ private boolean attemptConnection(AdminClientConfig conf,
                                     4096,
                                     4096,
                                     1000,
+                                    ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                                     Time.SYSTEM,
                                     false,
                                     new ApiVersions(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6863
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6863
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Edoardo Comar
>            Assignee: Edoardo Comar
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Currently Kafka clients resolve a symbolic hostname using
>   {{new InetSocketAddress(String hostname, int port)}}
> which only picks one IP address even if the DNS has multiple records for the hostname, as it calls
>  {{InetAddress.getAllByName(host)[0]}}
> For some environments where the hostnames are mapped by the DNS to multiple IPs, e.g. in clouds where the IPs point to the external load balancers, it would be preferable that the client, on failing to connect to one of the IPs, would try the other ones before giving up the connection.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)