You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/12 21:28:03 UTC
kafka git commit: KAFKA-5404;
Add more AdminClient checks to ClientCompatibilityTest
Repository: kafka
Updated Branches:
refs/heads/trunk 9d6f0f40c -> 7d1ef63be
KAFKA-5404; Add more AdminClient checks to ClientCompatibilityTest
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3263 from cmccabe/KAFKA-5404
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d1ef63b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d1ef63b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d1ef63b
Branch: refs/heads/trunk
Commit: 7d1ef63bec459bcf05b46c1542488283db02a6e2
Parents: 9d6f0f4
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Mon Jun 12 22:27:58 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 12 22:27:58 2017 +0100
----------------------------------------------------------------------
.../client_compatibility_features_test.py | 16 +--
.../kafka/tools/ClientCompatibilityTest.java | 121 +++++++++++++++++--
2 files changed, 119 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d1ef63b/tests/kafkatest/tests/client/client_compatibility_features_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 978b72e..a10c376 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -23,18 +23,24 @@ from ducktape.tests.test import TestContext
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from ducktape.tests.test import Test
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_10_1_0, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion
def get_broker_features(broker_version):
features = {}
if (broker_version < V_0_10_1_0):
+ features["create-topics-supported"] = False
features["offsets-for-times-supported"] = False
features["cluster-id-supported"] = False
features["expect-record-too-large-exception"] = True
else:
+ features["create-topics-supported"] = True
features["offsets-for-times-supported"] = True
features["cluster-id-supported"] = True
features["expect-record-too-large-exception"] = False
+ if (broker_version < V_0_11_0_0):
+ features["describe-acls-supported"] = False
+ else:
+ features["describe-acls-supported"] = True
return features
def run_command(node, cmd, ssh_log_file):
@@ -74,17 +80,13 @@ class ClientCompatibilityFeaturesTest(Test):
node = self.zk.nodes[0]
cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest "
"--bootstrap-server %s "
- "--offsets-for-times-supported %s "
- "--cluster-id-supported %s "
- "--expect-record-too-large-exception %s "
"--num-cluster-nodes %d "
"--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
self.kafka.bootstrap_servers(),
- features["offsets-for-times-supported"],
- features["cluster-id-supported"],
- features["expect-record-too-large-exception"],
len(self.kafka.nodes),
self.topics.keys()[0]))
+ for k, v in features.iteritems():
+ cmd = cmd + ("--%s %s " % (k, v))
results_dir = TestContext.results_dir(self.test_context, 0)
os.makedirs(results_dir)
ssh_log_file = "%s/%s" % (results_dir, "client_compatibility_test_output.txt")
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d1ef63b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 4fce8ce..431b53b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -22,6 +22,8 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,7 +39,10 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -49,6 +54,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -56,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -75,6 +82,8 @@ public class ClientCompatibilityTest {
final boolean expectClusterId;
final boolean expectRecordTooLargeException;
final int numClusterNodes;
+ final boolean createTopicsSupported;
+ final boolean describeAclsSupported;
TestConfig(Namespace res) {
this.bootstrapServer = res.getString("bootstrapServer");
@@ -83,6 +92,8 @@ public class ClientCompatibilityTest {
this.expectClusterId = res.getBoolean("clusterIdSupported");
this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException");
this.numClusterNodes = res.getInt("numClusterNodes");
+ this.createTopicsSupported = res.getBoolean("createTopicsSupported");
+ this.describeAclsSupported = res.getBoolean("describeAclsSupported");
}
}
@@ -135,6 +146,20 @@ public class ClientCompatibilityTest {
.dest("numClusterNodes")
.metavar("NUM_CLUSTER_NODES")
.help("The number of cluster nodes we should expect to see from the AdminClient.");
+ parser.addArgument("--create-topics-supported")
+ .action(store())
+ .required(true)
+ .type(Boolean.class)
+ .dest("createTopicsSupported")
+ .metavar("CREATE_TOPICS_SUPPORTED")
+ .help("Whether we should be able to create topics via the AdminClient.");
+ parser.addArgument("--describe-acls-supported")
+ .action(store())
+ .required(true)
+ .type(Boolean.class)
+ .dest("describeAclsSupported")
+ .metavar("DESCRIBE_ACLS_SUPPORTED")
+ .help("Whether describeAcls is supported in the AdminClient.");
Namespace res = null;
try {
@@ -196,7 +221,7 @@ public class ClientCompatibilityTest {
this.message2 = buf2.array();
}
- void run() throws Exception {
+ void run() throws Throwable {
long prodTimeMs = Time.SYSTEM.milliseconds();
testAdminClient();
testProduce();
@@ -218,10 +243,10 @@ public class ClientCompatibilityTest {
producer.close();
}
- void testAdminClient() throws Exception {
+ void testAdminClient() throws Throwable {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
- try (AdminClient client = AdminClient.create(adminProps)) {
+ try (final AdminClient client = AdminClient.create(adminProps)) {
while (true) {
Collection<Node> nodes = client.describeCluster().nodes().get();
if (nodes.size() == testConfig.numClusterNodes) {
@@ -234,6 +259,63 @@ public class ClientCompatibilityTest {
log.info("Saw only {} cluster nodes. Waiting to see {}.",
nodes.size(), testConfig.numClusterNodes);
}
+ tryFeature("createTopics", testConfig.createTopicsSupported,
+ new Invoker() {
+ @Override
+ public void invoke() throws Throwable {
+ try {
+ client.createTopics(Collections.singleton(
+ new NewTopic("newtopic", 1, (short) 1))).all().get();
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+ },
+ new ResultTester() {
+ @Override
+ public void test() throws Throwable {
+ while (true) {
+ try {
+ client.describeTopics(Collections.singleton("newtopic")).all().get();
+ break;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof UnknownTopicOrPartitionException)
+ continue;
+ throw e;
+ }
+ }
+ }
+ });
+ while (true) {
+ Collection<TopicListing> listings = client.listTopics().descriptions().get();
+ if (!testConfig.createTopicsSupported)
+ break;
+ boolean foundNewTopic = false;
+ for (TopicListing listing : listings) {
+ if (listing.name().equals("newtopic")) {
+ if (listing.internal())
+ throw new KafkaException("Did not expect newtopic to be an internal topic.");
+ foundNewTopic = true;
+ }
+ }
+ if (foundNewTopic)
+ break;
+ Thread.sleep(1);
+ log.info("Did not see newtopic. Retrying listTopics...");
+ }
+ tryFeature("describeAclsSupported", testConfig.describeAclsSupported,
+ new Invoker() {
+ @Override
+ public void invoke() throws Throwable {
+ try {
+ client.describeAcls(AclBindingFilter.ANY).all().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof SecurityDisabledException)
+ return;
+ throw e.getCause();
+ }
+ }
+ });
}
}
@@ -282,7 +364,7 @@ public class ClientCompatibilityTest {
}
}
- public void testConsume(final long prodTimeMs) throws Exception {
+ public void testConsume(final long prodTimeMs) throws Throwable {
Properties consumerProps = new Properties();
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512);
@@ -301,15 +383,15 @@ public class ClientCompatibilityTest {
}
final OffsetsForTime offsetsForTime = new OffsetsForTime();
tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
- new Runnable() {
+ new Invoker() {
@Override
- public void run() {
+ public void invoke() {
offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
}
},
- new Runnable() {
+ new ResultTester() {
@Override
- public void run() {
+ public void test() {
log.info("offsetsForTime = {}", offsetsForTime.result);
}
});
@@ -393,9 +475,26 @@ public class ClientCompatibilityTest {
log.info("Closed consumer.");
}
- private void tryFeature(String featureName, boolean supported, Runnable invoker, Runnable resultTester) {
+ private interface Invoker {
+ void invoke() throws Throwable;
+ }
+
+ private interface ResultTester {
+ void test() throws Throwable;
+ }
+
+ private void tryFeature(String featureName, boolean supported, Invoker invoker) throws Throwable {
+ tryFeature(featureName, supported, invoker, new ResultTester() {
+ @Override
+ public void test() {
+ }
+ });
+ }
+
+ private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester resultTester)
+ throws Throwable {
try {
- invoker.run();
+ invoker.invoke();
log.info("Successfully used feature {}", featureName);
} catch (UnsupportedVersionException e) {
log.info("Got UnsupportedVersionException when attempting to use feature {}", featureName);
@@ -407,6 +506,6 @@ public class ClientCompatibilityTest {
if (!supported) {
throw new RuntimeException("Did not expect " + featureName + " to be supported, but it was.");
}
- resultTester.run();
+ resultTester.test();
}
}