You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/10/13 20:39:46 UTC
[kafka] branch trunk updated: KAFKA-6195: Resolve DNS aliases in
bootstrap.server (KIP-235) (#4485)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a947fe8 KAFKA-6195: Resolve DNS aliases in bootstrap.server (KIP-235) (#4485)
a947fe8 is described below
commit a947fe8da8646058335ff76b9a744462a3337a63
Author: jonathanskrzypek <jo...@gs.com>
AuthorDate: Sat Oct 13 21:39:35 2018 +0100
KAFKA-6195: Resolve DNS aliases in bootstrap.server (KIP-235) (#4485)
Adds `client.dns.lookup=resolve_canonical_bootstrap_servers_only` option to perform full dns resolution of bootstrap addresses
Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Sriharsha Chintalapani <sr...@apache.org>, Edoardo Comar <ec...@uk.ibm.com>, Mickael Maison <mi...@gmail.com>, Manikumar Reddy <ma...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
.../config => clients}/ClientDnsLookup.java | 5 ++-
.../java/org/apache/kafka/clients/ClientUtils.java | 32 +++++++++++----
.../kafka/clients/ClusterConnectionStates.java | 1 -
.../apache/kafka/clients/CommonClientConfigs.java | 8 ++--
.../org/apache/kafka/clients/NetworkClient.java | 1 -
.../kafka/clients/admin/AdminClientConfig.java | 16 ++++++--
.../kafka/clients/admin/KafkaAdminClient.java | 8 ++--
.../kafka/clients/consumer/ConsumerConfig.java | 21 ++++++----
.../kafka/clients/consumer/KafkaConsumer.java | 11 ++---
.../kafka/clients/producer/KafkaProducer.java | 11 +++--
.../kafka/clients/producer/ProducerConfig.java | 21 ++++++----
.../org/apache/kafka/clients/ClientUtilsTest.java | 47 ++++++++++++++++------
.../kafka/clients/ClusterConnectionStatesTest.java | 1 -
.../apache/kafka/clients/NetworkClientTest.java | 1 -
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../clients/producer/internals/SenderTest.java | 2 +-
.../apache/kafka/connect/runtime/WorkerConfig.java | 21 ++++++----
.../runtime/distributed/WorkerGroupMember.java | 6 ++-
core/src/main/scala/kafka/admin/AdminClient.scala | 13 +++++-
.../controller/ControllerChannelManager.scala | 2 -
.../TransactionMarkerChannelManager.scala | 1 -
core/src/main/scala/kafka/server/KafkaServer.scala | 3 +-
.../kafka/server/ReplicaFetcherBlockingSend.scala | 1 -
.../kafka/tools/ReplicaVerificationTool.scala | 1 -
.../trogdor/workload/ConnectionStressWorker.java | 8 ++--
25 files changed, 158 insertions(+), 88 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
similarity index 88%
rename from clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
rename to clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
index 4a013b9..96d47c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
@@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.config;
+package org.apache.kafka.clients;
import java.util.Locale;
public enum ClientDnsLookup {
DEFAULT("default"),
- USE_ALL_DNS_IPS("use_all_dns_ips");
+ USE_ALL_DNS_IPS("use_all_dns_ips"),
+ RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only");
private String clientDnsLookup;
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 1661ea3..dce5f3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -17,7 +17,6 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ChannelBuilder;
@@ -42,10 +41,12 @@ import static org.apache.kafka.common.utils.Utils.getPort;
public final class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
- private ClientUtils() {}
+ private ClientUtils() {
+ }
- public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
List<InetSocketAddress> addresses = new ArrayList<>();
+ ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup);
for (String url : urls) {
if (url != null && !url.isEmpty()) {
try {
@@ -54,15 +55,30 @@ public final class ClientUtils {
if (host == null || port == null)
throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- InetSocketAddress address = new InetSocketAddress(host, port);
-
- if (address.isUnresolved()) {
- log.warn("Removing server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
+ if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+ InetAddress[] inetAddresses = InetAddress.getAllByName(host);
+ for (InetAddress inetAddress : inetAddresses) {
+ String resolvedCanonicalName = inetAddress.getCanonicalHostName();
+ InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
+ if (address.isUnresolved()) {
+ log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname [} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
+ } else {
+ addresses.add(address);
+ }
+ }
} else {
- addresses.add(address);
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ if (address.isUnresolved()) {
+ log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
+ } else {
+ addresses.add(address);
+ }
}
+
} catch (IllegalArgumentException e) {
throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ } catch (UnknownHostException e) {
+ throw new ConfigException("Unknown host in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index b697de7..f198533 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.AuthenticationException;
import java.net.InetAddress;
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index b179f02..c8e2357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -41,6 +41,11 @@ public class CommonClientConfigs {
+ "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+ "servers (you may want more than one, though, in case a server is down).";
+ public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
+ public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+ + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>"
+ + "<p>If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.</p>";
+
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
@@ -93,9 +98,6 @@ public class CommonClientConfigs {
+ "elapses the client will resend the request if necessary or fail the request if "
+ "retries are exhausted.";
- public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
- public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
- + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>";
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7ea05f6..c6f0c0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index a051de2..47c76ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -17,9 +17,9 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@@ -44,6 +44,12 @@ public class AdminClientConfig extends AbstractConfig {
private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
/**
+ * <code>client.dns.lookup</code>
+ */
+ public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+ private static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+
+ /**
* <code>reconnect.backoff.ms</code>
*/
public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
@@ -159,12 +165,14 @@ public class AdminClientConfig extends AbstractConfig {
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
Importance.LOW,
METRICS_RECORDING_LEVEL_DOC)
- .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+ .define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
- in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+ in(ClientDnsLookup.DEFAULT.toString(),
+ ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
- CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+ CLIENT_DNS_LOOKUP_DOC)
// security support
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 86e1447..c8418c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -18,10 +18,10 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException;
@@ -46,7 +46,6 @@ import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -362,7 +361,7 @@ public class KafkaAdminClient extends AdminClient {
config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
(int) TimeUnit.HOURS.toMillis(1),
- ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+ ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
@@ -414,7 +413,8 @@ public class KafkaAdminClient extends AdminClient {
this.time = time;
this.metadataManager = metadataManager;
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
- config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+ config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+ config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
this.metrics = metrics;
this.client = client;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index a1c9dc2..795a762 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@@ -90,6 +90,9 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ /** <code>client.dns.lookup</code> */
+ public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+
/**
* <code>enable.auto.commit</code>
*/
@@ -258,7 +261,7 @@ public class ConsumerConfig extends AbstractConfig {
" return the LSO";
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
-
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -266,6 +269,14 @@ public class ConsumerConfig extends AbstractConfig {
new ConfigDef.NonNullValidator(),
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ .define(CLIENT_DNS_LOOKUP_CONFIG,
+ Type.STRING,
+ ClientDnsLookup.DEFAULT.toString(),
+ in(ClientDnsLookup.DEFAULT.toString(),
+ ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+ Importance.MEDIUM,
+ CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
@@ -453,12 +464,6 @@ public class ConsumerConfig extends AbstractConfig {
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
Importance.MEDIUM,
ISOLATION_LEVEL_DOC)
- .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
- Type.STRING,
- ClientDnsLookup.DEFAULT.toString(),
- in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e79ff07..4061373 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -17,8 +17,8 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -36,7 +36,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -710,8 +709,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0);
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+ config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
+ this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
@@ -732,7 +733,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
- ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+ ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
new ApiVersions(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 465e60f..c68a014 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
@@ -49,7 +49,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -407,7 +406,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+ config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
if (metadata != null) {
this.metadata = metadata;
} else {
@@ -449,7 +450,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
- ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+ ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
@@ -496,7 +497,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
+
TransactionManager transactionManager = null;
+
boolean userConfiguredIdempotence = false;
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
userConfiguredIdempotence = true;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f08159d..c63477d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@@ -52,6 +52,9 @@ public class ProducerConfig extends AbstractConfig {
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ /** <code>client.dns.lookup</code> */
+ public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+
/** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
@@ -239,6 +242,14 @@ public class ProducerConfig extends AbstractConfig {
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ .define(CLIENT_DNS_LOOKUP_CONFIG,
+ Type.STRING,
+ ClientDnsLookup.DEFAULT.toString(),
+ in(ClientDnsLookup.DEFAULT.toString(),
+ ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+ Importance.MEDIUM,
+ CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
@@ -347,13 +358,7 @@ public class ProducerConfig extends AbstractConfig {
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
- TRANSACTIONAL_ID_DOC)
- .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
- Type.STRING,
- ClientDnsLookup.DEFAULT.toString(),
- in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
+ TRANSACTIONAL_ID_DOC);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index bea464f..35f52a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigException;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -31,27 +30,46 @@ import java.util.List;
public class ClientUtilsTest {
+
@Test
- public void testParseAndValidateAddresses() {
- check("127.0.0.1:8000");
- check("mydomain.com:8080");
- check("[::1]:8000");
- check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
- List<InetSocketAddress> validatedAddresses = check("some.invalid.hostname.foo.bar.local:9999", "mydomain.com:10000");
+ public void testParseAndValidateAddresses() throws UnknownHostException {
+ checkWithoutLookup("127.0.0.1:8000");
+ checkWithoutLookup("localhost:8080");
+ checkWithoutLookup("[::1]:8000");
+ checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000");
+ List<InetSocketAddress> validatedAddresses = checkWithoutLookup("localhost:10000");
assertEquals(1, validatedAddresses.size());
InetSocketAddress onlyAddress = validatedAddresses.get(0);
- assertEquals("mydomain.com", onlyAddress.getHostName());
+ assertEquals("localhost", onlyAddress.getHostName());
assertEquals(10000, onlyAddress.getPort());
}
+ @Test
+ public void testParseAndValidateAddressesWithReverseLookup() {
+ checkWithoutLookup("127.0.0.1:8000");
+ checkWithoutLookup("localhost:8080");
+ checkWithoutLookup("[::1]:8000");
+ checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000");
+ List<InetSocketAddress> validatedAddresses = checkWithLookup(Arrays.asList("example.com:10000"));
+ assertEquals(2, validatedAddresses.size());
+ InetSocketAddress address = validatedAddresses.get(0);
+ assertEquals("93.184.216.34", address.getHostName());
+ assertEquals(10000, address.getPort());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidConfig() {
+ ClientUtils.parseAndValidateAddresses(Arrays.asList("localhost:10000"), "random.value");
+ }
+
@Test(expected = ConfigException.class)
public void testNoPort() {
- check("127.0.0.1");
+ checkWithoutLookup("127.0.0.1");
}
@Test(expected = ConfigException.class)
public void testOnlyBadHostname() {
- check("some.invalid.hostname.foo.bar.local:9999");
+ checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999");
}
@Test
@@ -87,7 +105,12 @@ public class ClientUtilsTest {
assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
}
- private List<InetSocketAddress> check(String... url) {
- return ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+ private List<InetSocketAddress> checkWithoutLookup(String... url) {
+ return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString());
+ }
+
+ private List<InetSocketAddress> checkWithLookup(List<String> url) {
+ return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
}
+
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index d4a2a55..23edaa9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.MockTime;
import org.junit.Before;
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index aaf827f..8abe9a40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 42deee7..94d8d5b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.FetchSessionHandler;
@@ -36,7 +37,6 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
@@ -2735,7 +2735,7 @@ public class FetcherTest {
String topicName2 = "topic2";
TopicPartition t2p0 = new TopicPartition(topicName2, 0);
// Expect a metadata refresh.
- metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+ metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())),
Collections.<String>emptySet(),
time.milliseconds());
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6d4d78c..23ca2ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
@@ -44,7 +45,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index aac9fb2..be3a709 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@@ -61,6 +61,9 @@ public class WorkerConfig extends AbstractConfig {
+ "than one, though, in case a server is down).";
public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+ public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+ public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@@ -223,6 +226,14 @@ public class WorkerConfig extends AbstractConfig {
return new ConfigDef()
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+ .define(CLIENT_DNS_LOOKUP_CONFIG,
+ Type.STRING,
+ ClientDnsLookup.DEFAULT.toString(),
+ in(ClientDnsLookup.DEFAULT.toString(),
+ ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+ Importance.MEDIUM,
+ CLIENT_DNS_LOOKUP_DOC)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
@@ -278,13 +289,7 @@ public class WorkerConfig extends AbstractConfig {
Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
.define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
- Importance.LOW, REST_EXTENSION_CLASSES_DOC)
- .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
- Type.STRING,
- ClientDnsLookup.DEFAULT.toString(),
- in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
+ Importance.LOW, REST_EXTENSION_CLASSES_DOC);
}
private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index de8e8b2..5725ff5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
@@ -24,7 +25,6 @@ import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -96,7 +96,9 @@ public class WorkerGroupMember {
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+ config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+ config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 5876b6e..aaa0903 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -23,6 +23,7 @@ import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
@@ -39,7 +40,6 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
-import org.apache.kafka.common.config.ClientDnsLookup
/**
* A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -386,6 +386,14 @@ object AdminClient {
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+ Type.STRING,
+ ClientDnsLookup.DEFAULT.toString,
+ in(ClientDnsLookup.DEFAULT.toString,
+ ClientDnsLookup.USE_ALL_DNS_IPS.toString,
+ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
+ Importance.MEDIUM,
+ CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
@@ -429,7 +437,8 @@ object AdminClient {
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
- val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+ val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
+ val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
metadata.update(bootstrapCluster, Collections.emptySet(), 0)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 86b6f94..7002219 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -39,8 +39,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.{Set, mutable}
-import org.apache.kafka.common.config.ClientDnsLookup
-
object ControllerChannelManager {
val QueueSizeMetricName = "QueueSize"
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index d0e765c..bd25d94 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -36,7 +36,6 @@ import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQue
import collection.JavaConverters._
import scala.collection.{concurrent, immutable}
-import org.apache.kafka.common.config.ClientDnsLookup
object TransactionMarkerChannelManager {
def apply(config: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 841ea82..bef0663 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
import kafka.zk.{BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
@@ -51,7 +51,6 @@ import org.apache.kafka.common.{ClusterResource, Node}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, mutable}
-import org.apache.kafka.common.config.ClientDnsLookup
object KafkaServer {
// Copy the subset of properties that are relevant to Logs
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index d15fdae..4e642f3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.common.Node
import org.apache.kafka.common.requests.AbstractRequest.Builder
import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.ClientDnsLookup
trait BlockingSend {
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 7871015..1f87d7a 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.ClientDnsLookup
/**
* For verifying the consistency among replicas.
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 74ab234..9f15696 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -20,8 +20,8 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
@@ -127,7 +126,8 @@ public class ConnectionStressWorker implements TaskWorker {
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf());
AdminClientConfig conf = new AdminClientConfig(props);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
- conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+ conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+ conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
while (true) {
if (doneFuture.isDone()) {
@@ -182,7 +182,7 @@ public class ConnectionStressWorker implements TaskWorker {
4096,
4096,
1000,
- ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+ ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
Time.SYSTEM,
false,
new ApiVersions(),