You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/12 21:28:03 UTC

kafka git commit: KAFKA-5404; Add more AdminClient checks to ClientCompatibilityTest

Repository: kafka
Updated Branches:
  refs/heads/trunk 9d6f0f40c -> 7d1ef63be


KAFKA-5404; Add more AdminClient checks to ClientCompatibilityTest

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3263 from cmccabe/KAFKA-5404


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d1ef63b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d1ef63b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d1ef63b

Branch: refs/heads/trunk
Commit: 7d1ef63bec459bcf05b46c1542488283db02a6e2
Parents: 9d6f0f4
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Mon Jun 12 22:27:58 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 12 22:27:58 2017 +0100

----------------------------------------------------------------------
 .../client_compatibility_features_test.py       |  16 +--
 .../kafka/tools/ClientCompatibilityTest.java    | 121 +++++++++++++++++--
 2 files changed, 119 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d1ef63b/tests/kafkatest/tests/client/client_compatibility_features_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 978b72e..a10c376 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -23,18 +23,24 @@ from ducktape.tests.test import TestContext
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from ducktape.tests.test import Test
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_10_1_0, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion
 
 def get_broker_features(broker_version):
     features = {}
     if (broker_version < V_0_10_1_0):
+        features["create-topics-supported"] = False
         features["offsets-for-times-supported"] = False
         features["cluster-id-supported"] = False
         features["expect-record-too-large-exception"] = True
     else:
+        features["create-topics-supported"] = True
         features["offsets-for-times-supported"] = True
         features["cluster-id-supported"] = True
         features["expect-record-too-large-exception"] = False
+    if (broker_version < V_0_11_0_0):
+        features["describe-acls-supported"] = False
+    else:
+        features["describe-acls-supported"] = True
     return features
 
 def run_command(node, cmd, ssh_log_file):
@@ -74,17 +80,13 @@ class ClientCompatibilityFeaturesTest(Test):
         node = self.zk.nodes[0]
         cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest "
                "--bootstrap-server %s "
-               "--offsets-for-times-supported %s "
-               "--cluster-id-supported %s "
-               "--expect-record-too-large-exception %s "
                "--num-cluster-nodes %d "
                "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
                                self.kafka.bootstrap_servers(),
-                               features["offsets-for-times-supported"],
-                               features["cluster-id-supported"],
-                               features["expect-record-too-large-exception"],
                                len(self.kafka.nodes),
                                self.topics.keys()[0]))
+        for k, v in features.iteritems():
+            cmd = cmd + ("--%s %s " % (k, v))
         results_dir = TestContext.results_dir(self.test_context, 0)
         os.makedirs(results_dir)
         ssh_log_file = "%s/%s" % (results_dir, "client_compatibility_test_output.txt")

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d1ef63b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 4fce8ce..431b53b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -22,6 +22,8 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,7 +39,10 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -49,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -56,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -75,6 +82,8 @@ public class ClientCompatibilityTest {
         final boolean expectClusterId;
         final boolean expectRecordTooLargeException;
         final int numClusterNodes;
+        final boolean createTopicsSupported;
+        final boolean describeAclsSupported;
 
         TestConfig(Namespace res) {
             this.bootstrapServer = res.getString("bootstrapServer");
@@ -83,6 +92,8 @@ public class ClientCompatibilityTest {
             this.expectClusterId = res.getBoolean("clusterIdSupported");
             this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException");
             this.numClusterNodes = res.getInt("numClusterNodes");
+            this.createTopicsSupported = res.getBoolean("createTopicsSupported");
+            this.describeAclsSupported = res.getBoolean("describeAclsSupported");
         }
     }
 
@@ -135,6 +146,20 @@ public class ClientCompatibilityTest {
             .dest("numClusterNodes")
             .metavar("NUM_CLUSTER_NODES")
             .help("The number of cluster nodes we should expect to see from the AdminClient.");
+        parser.addArgument("--create-topics-supported")
+            .action(store())
+            .required(true)
+            .type(Boolean.class)
+            .dest("createTopicsSupported")
+            .metavar("CREATE_TOPICS_SUPPORTED")
+            .help("Whether we should be able to create topics via the AdminClient.");
+        parser.addArgument("--describe-acls-supported")
+            .action(store())
+            .required(true)
+            .type(Boolean.class)
+            .dest("describeAclsSupported")
+            .metavar("DESCRIBE_ACLS_SUPPORTED")
+            .help("Whether describeAcls is supported in the AdminClient.");
 
         Namespace res = null;
         try {
@@ -196,7 +221,7 @@ public class ClientCompatibilityTest {
         this.message2 = buf2.array();
     }
 
-    void run() throws Exception {
+    void run() throws Throwable {
         long prodTimeMs = Time.SYSTEM.milliseconds();
         testAdminClient();
         testProduce();
@@ -218,10 +243,10 @@ public class ClientCompatibilityTest {
         producer.close();
     }
 
-    void testAdminClient() throws Exception {
+    void testAdminClient() throws Throwable {
         Properties adminProps = new Properties();
         adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
-        try (AdminClient client = AdminClient.create(adminProps)) {
+        try (final AdminClient client = AdminClient.create(adminProps)) {
             while (true) {
                 Collection<Node> nodes = client.describeCluster().nodes().get();
                 if (nodes.size() == testConfig.numClusterNodes) {
@@ -234,6 +259,63 @@ public class ClientCompatibilityTest {
                 log.info("Saw only {} cluster nodes.  Waiting to see {}.",
                     nodes.size(), testConfig.numClusterNodes);
             }
+            tryFeature("createTopics", testConfig.createTopicsSupported,
+                new Invoker() {
+                    @Override
+                    public void invoke() throws Throwable {
+                        try {
+                            client.createTopics(Collections.singleton(
+                                new NewTopic("newtopic", 1, (short) 1))).all().get();
+                        } catch (ExecutionException e) {
+                            throw e.getCause();
+                        }
+                    }
+                },
+                new ResultTester() {
+                    @Override
+                    public void test() throws Throwable {
+                        while (true) {
+                            try {
+                                client.describeTopics(Collections.singleton("newtopic")).all().get();
+                                break;
+                            } catch (ExecutionException e) {
+                                if (e.getCause() instanceof UnknownTopicOrPartitionException)
+                                    continue;
+                                throw e;
+                            }
+                        }
+                    }
+                });
+            while (true) {
+                Collection<TopicListing> listings = client.listTopics().descriptions().get();
+                if (!testConfig.createTopicsSupported)
+                    break;
+                boolean foundNewTopic = false;
+                for (TopicListing listing : listings) {
+                    if (listing.name().equals("newtopic")) {
+                        if (listing.internal())
+                            throw new KafkaException("Did not expect newtopic to be an internal topic.");
+                        foundNewTopic = true;
+                    }
+                }
+                if (foundNewTopic)
+                    break;
+                Thread.sleep(1);
+                log.info("Did not see newtopic.  Retrying listTopics...");
+            }
+            tryFeature("describeAclsSupported", testConfig.describeAclsSupported,
+                new Invoker() {
+                    @Override
+                    public void invoke() throws Throwable {
+                        try {
+                            client.describeAcls(AclBindingFilter.ANY).all().get();
+                        } catch (ExecutionException e) {
+                            if (e.getCause() instanceof SecurityDisabledException)
+                                return;
+                            throw e.getCause();
+                        }
+                    }
+                });
         }
     }
 
@@ -282,7 +364,7 @@ public class ClientCompatibilityTest {
         }
     }
 
-    public void testConsume(final long prodTimeMs) throws Exception {
+    public void testConsume(final long prodTimeMs) throws Throwable {
         Properties consumerProps = new Properties();
         consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
         consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512);
@@ -301,15 +383,15 @@ public class ClientCompatibilityTest {
         }
         final OffsetsForTime offsetsForTime = new OffsetsForTime();
         tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
-                new Runnable() {
+                new Invoker() {
                     @Override
-                    public void run() {
+                    public void invoke() {
                         offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
                     }
                 },
-                new Runnable() {
+                new ResultTester() {
                     @Override
-                    public void run() {
+                    public void test() {
                         log.info("offsetsForTime = {}", offsetsForTime.result);
                     }
                 });
@@ -393,9 +475,26 @@ public class ClientCompatibilityTest {
         log.info("Closed consumer.");
     }
 
-    private void tryFeature(String featureName, boolean supported, Runnable invoker, Runnable resultTester) {
+    private interface Invoker {
+        void invoke() throws Throwable;
+    }
+
+    private interface ResultTester {
+        void test() throws Throwable;
+    }
+
+    private void tryFeature(String featureName, boolean supported, Invoker invoker) throws Throwable {
+        tryFeature(featureName, supported, invoker, new ResultTester() {
+                @Override
+                public void test() {
+                }
+            });
+    }
+
+    private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester resultTester)
+            throws Throwable {
         try {
-            invoker.run();
+            invoker.invoke();
             log.info("Successfully used feature {}", featureName);
         } catch (UnsupportedVersionException e) {
             log.info("Got UnsupportedVersionException when attempting to use feature {}", featureName);
@@ -407,6 +506,6 @@ public class ClientCompatibilityTest {
         if (!supported) {
             throw new RuntimeException("Did not expect " + featureName + " to be supported, but it was.");
         }
-        resultTester.run();
+        resultTester.test();
     }
 }