You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/03 05:26:12 UTC

kafka git commit: KAFKA-5374; Set allow auto topic creation to false when requesting node information only

Repository: kafka
Updated Branches:
  refs/heads/trunk 6a5a908b1 -> f389b7157


KAFKA-5374; Set allow auto topic creation to false when requesting node information only

It avoids the need to handle protocol downgrades and it's safe (i.e. it will never cause
the auto creation of topics).

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3220 from ijuma/kafka-5374-admin-client-metadata


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f389b715
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f389b715
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f389b715

Branch: refs/heads/trunk
Commit: f389b715707e4e53eaf6ce476a218a4a06c427ee
Parents: 6a5a908
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Sat Jun 3 06:26:06 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 3 06:26:16 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  1 +
 .../kafka/clients/admin/KafkaAdminClient.java   |  8 +++--
 .../client_compatibility_features_test.py       |  2 ++
 .../kafka/tools/ClientCompatibilityTest.java    | 35 ++++++++++++++++++++
 4 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f389b715/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ab4f15d..f83f6e7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -162,6 +162,7 @@
 
   <subpackage name="tools">
     <allow pkg="org.apache.kafka.common"/>
+    <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.producer" />
     <allow pkg="org.apache.kafka.clients.consumer" />
     <allow pkg="com.fasterxml.jackson" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/f389b715/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 199b07a..da76032 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -281,8 +281,10 @@ public class KafkaAdminClient extends AdminClient {
         ApiVersions apiVersions = new ApiVersions();
 
         try {
+            // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
+            // simplifies communication with older brokers)
             metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
+                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
             Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
@@ -1222,7 +1224,9 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(Collections.<String>emptyList(), false);
+                // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
+                // simplifies communication with older brokers)
+                return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f389b715/tests/kafkatest/tests/client/client_compatibility_features_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 1b6519d..1b32540 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -77,11 +77,13 @@ class ClientCompatibilityFeaturesTest(Test):
                "--offsets-for-times-supported %s "
                "--cluster-id-supported %s "
                "--expect-record-too-large-exception %s "
+               "--num-cluster-nodes %d "
                "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
                                self.kafka.bootstrap_servers(),
                                features["offsets-for-times-supported"],
                                features["cluster-id-supported"],
                                features["expect-record-too-large-exception"],
+                               len(self.kafka.nodes),
                                self.topics.keys()[0]))
         results_dir = TestContext.results_dir(self.test_context, 0)
         os.makedirs(results_dir)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f389b715/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 2a7d3e6..4fce8ce 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -20,6 +20,8 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -31,6 +33,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -44,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -69,6 +74,7 @@ public class ClientCompatibilityTest {
         final boolean offsetsForTimesSupported;
         final boolean expectClusterId;
         final boolean expectRecordTooLargeException;
+        final int numClusterNodes;
 
         TestConfig(Namespace res) {
             this.bootstrapServer = res.getString("bootstrapServer");
@@ -76,6 +82,7 @@ public class ClientCompatibilityTest {
             this.offsetsForTimesSupported = res.getBoolean("offsetsForTimesSupported");
             this.expectClusterId = res.getBoolean("clusterIdSupported");
             this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException");
+            this.numClusterNodes = res.getInt("numClusterNodes");
         }
     }
 
@@ -121,6 +128,14 @@ public class ClientCompatibilityTest {
             .help("True if we should expect a RecordTooLargeException when trying to read from a topic " +
                   "that contains a message that is bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG +
                   ".  This is pre-KIP-74 behavior.");
+        parser.addArgument("--num-cluster-nodes")
+            .action(store())
+            .required(true)
+            .type(Integer.class)
+            .dest("numClusterNodes")
+            .metavar("NUM_CLUSTER_NODES")
+            .help("The number of cluster nodes we should expect to see from the AdminClient.");
+
         Namespace res = null;
         try {
             res = parser.parseArgs(args);
@@ -183,6 +198,7 @@ public class ClientCompatibilityTest {
 
     void run() throws Exception {
         long prodTimeMs = Time.SYSTEM.milliseconds();
+        testAdminClient();
         testProduce();
         testConsume(prodTimeMs);
     }
@@ -202,6 +218,25 @@ public class ClientCompatibilityTest {
         producer.close();
     }
 
+    void testAdminClient() throws Exception {
+        Properties adminProps = new Properties();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
+        try (AdminClient client = AdminClient.create(adminProps)) {
+            while (true) {
+                Collection<Node> nodes = client.describeCluster().nodes().get();
+                if (nodes.size() == testConfig.numClusterNodes) {
+                    break;
+                } else if (nodes.size() > testConfig.numClusterNodes) {
+                    throw new KafkaException("Expected to see " + testConfig.numClusterNodes +
+                        " nodes, but saw " + nodes.size());
+                }
+                Thread.sleep(1);
+                log.info("Saw only {} cluster nodes.  Waiting to see {}.",
+                    nodes.size(), testConfig.numClusterNodes);
+            }
+        }
+    }
+
     private static class OffsetsForTime {
         Map<TopicPartition, OffsetAndTimestamp> result;