You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/27 22:09:48 UTC
[kafka] branch trunk updated: KAFKA-7091;
AdminClient should handle FindCoordinatorResponse errors (#5278)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51935ee KAFKA-7091; AdminClient should handle FindCoordinatorResponse errors (#5278)
51935ee is described below
commit 51935ee2e6dc473415b6708b60c9d47410876997
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Thu Jun 28 03:39:44 2018 +0530
KAFKA-7091; AdminClient should handle FindCoordinatorResponse errors (#5278)
- Update KafkaAdminClient implementation to handle FindCoordinatorResponse errors
- Remove scala AdminClient usage from core and streams tests
Reviewers: Matthias J. Sax <ma...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/admin/KafkaAdminClient.java | 28 +++++---
.../common/requests/FindCoordinatorResponse.java | 8 ++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 12 ++++
.../kafka/api/AdminClientIntegrationTest.scala | 15 +---
.../kafka/api/AuthorizerIntegrationTest.scala | 25 +++----
.../integration/kafka/api/ConsumerBounceTest.scala | 83 ++++++++++++++++------
.../api/SaslSslAdminClientIntegrationTest.scala | 2 +
.../test/scala/unit/kafka/utils/TestUtils.scala | 15 +++-
.../integration/AbstractResetIntegrationTest.java | 21 ++----
9 files changed, 136 insertions(+), 73 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 62b6b6e..7e245d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2395,16 +2395,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
- Errors error = fcResponse.error();
- if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
- // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary.
- throw error.exception();
- } else if (error != Errors.NONE) {
- // All other errors are immediate failures.
- KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId);
- future.completeExceptionally(error.exception());
+
+ if (handleFindCoordinatorError(fcResponse, futures.get(groupId)))
return;
- }
final long nowDescribeConsumerGroups = time.milliseconds();
final int nodeId = fcResponse.node().id();
@@ -2476,6 +2469,17 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
}
+ private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
+ Errors error = response.error();
+ if (error.exception() instanceof RetriableException) {
+ throw error.exception();
+ } else if (response.hasError()) {
+ future.completeExceptionally(error.exception());
+ return true;
+ }
+ return false;
+ }
+
private final static class ListConsumerGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ConsumerGroupListing> listings;
@@ -2610,6 +2614,9 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+ if (handleFindCoordinatorError(response, groupOffsetListingFuture))
+ return;
+
final long nowListConsumerGroupOffsets = time.milliseconds();
final int nodeId = response.node().id();
@@ -2696,6 +2703,9 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+ if (handleFindCoordinatorError(response, futures.get(groupId)))
+ return;
+
final long nowDeleteConsumerGroups = time.milliseconds();
final int nodeId = response.node().id();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 39726da..bc7f654 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -68,9 +68,11 @@ public class FindCoordinatorResponse extends AbstractResponse {
/**
* Possible error codes:
*
+ * COORDINATOR_LOAD_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR (16)
* GROUP_AUTHORIZATION_FAILED (30)
+ * INVALID_REQUEST (42)
+ * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
*/
@@ -107,6 +109,10 @@ public class FindCoordinatorResponse extends AbstractResponse {
return throttleTimeMs;
}
+ public boolean hasError() {
+ return this.error != Errors.NONE;
+ }
+
public Errors error() {
return error;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3566f83..8363079 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -1071,6 +1072,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNode(env.cluster().controller());
+ //Retriable FindCoordinatorResponse errors should be retried
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
+
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> response = new HashMap<>();
@@ -1081,6 +1086,13 @@ public class KafkaAdminClientTest {
final KafkaFuture<Void> results = result.deletedGroups().get("group-0");
assertNull(results.get());
+
+ //should throw error for non-retriable errors
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
+
+ final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
+ assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
+
}
}
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index fe98fda..9055e68 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -29,12 +29,13 @@ import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig, KafkaServer}
import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
+import kafka.utils.TestUtils._
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, TopicPartition, TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, TopicPartitionReplica}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
@@ -125,18 +126,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}, "timed out waiting for topics")
}
- def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
- try {
- future.get()
- fail("Expected CompletableFuture.get to return an exception")
- } catch {
- case e: ExecutionException =>
- val cause = e.getCause()
- assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
- cause.getClass().getName, clazz.isInstance(cause))
- }
- }
-
@Test
def testClose(): Unit = {
val client = AdminClient.create(createConfig())
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a21affc..72b3b24 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
import java.time.Duration
-import kafka.admin.AdminClient
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.common.TopicAndPartition
import kafka.log.LogConfig
@@ -27,7 +26,7 @@ import kafka.network.SocketServer
import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.NewPartitions
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
@@ -1004,17 +1003,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
this.consumers.head.partitionsFor(topic)
}
- @Test(expected = classOf[GroupAuthorizationException])
+ @Test
def testDescribeGroupApiWithNoGroupAcl() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
- createAdminClient().describeConsumerGroup(group)
+ val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
+ TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException])
}
@Test
def testDescribeGroupApiWithGroupDescribe() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
- createAdminClient().describeConsumerGroup(group)
+ createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get()
}
@Test
@@ -1036,8 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
- val result = createAdminClient().deleteConsumerGroups(List(group))
- assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
+ createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get()
}
@Test
@@ -1046,14 +1045,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
- val result = createAdminClient().deleteConsumerGroups(List(group))
- assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+ val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
+ TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
}
@Test
def testDeleteGroupApiWithNoDeleteGroupAcl2() {
- val result = createAdminClient().deleteConsumerGroups(List(group))
- assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
+ val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
+ TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
}
@Test
@@ -1462,7 +1461,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createAdminClient(): AdminClient = {
- val adminClient = AdminClient.createSimplePlaintext(brokerList)
+ val props = new Properties()
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val adminClient = AdminClient.create(props)
adminClients += adminClient
adminClient
}
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f200cc2..07cbf0c 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -14,28 +14,35 @@
package kafka.api
import java.util.concurrent._
-import java.util.{Collection, Collections}
+import java.util.{Collection, Collections, Properties}
-import kafka.admin.AdminClient
-import kafka.server.KafkaConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import scala.collection.JavaConverters._
+import scala.collection.mutable.Buffer
/**
* Integration tests for the consumer that cover basic usage as well as server failures
*/
-class ConsumerBounceTest extends IntegrationTestHarness with Logging {
+class ConsumerBounceTest extends BaseRequestTest with Logging {
+
+ override def numBrokers: Int = 3
val producerCount = 1
val consumerCount = 2
- val serverCount = 3
+
+ val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
val topic = "topic"
val part = 0
@@ -45,13 +52,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val gracefulCloseTimeMs = 1000
val executor = Executors.newScheduledThreadPool(2)
- // configure the servers and clients
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
- this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
- this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
- this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+ val producerConfig = new Properties
+ val consumerConfig = new Properties
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -59,8 +61,19 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ def serverConfig(): Properties = {
+ val properties = new Properties
+ properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+ properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
+ properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+ properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+ properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+ properties
+ }
+
override def generateConfigs = {
- FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false)
+ FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, serverConfig))
}
@@ -68,8 +81,26 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
override def setUp() {
super.setUp()
+ for (_ <- 0 until producerCount)
+ producers += createProducer
+
+ for (_ <- 0 until consumerCount)
+ consumers += createConsumer
+
// create the test topic with all the brokers as replicas
- createTopic(topic, 1, serverCount)
+ createTopic(topic, 1, numBrokers)
+ }
+
+ def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+ TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ securityProtocol = SecurityProtocol.PLAINTEXT,
+ props = Some(producerConfig))
+ }
+
+ def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
+ TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
+ securityProtocol = SecurityProtocol.PLAINTEXT,
+ props = Some(consumerConfig))
}
@After
@@ -78,6 +109,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
executor.shutdownNow()
// Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
+ producers.foreach(_.close())
+ consumers.foreach(_.close())
} finally {
super.tearDown()
}
@@ -173,7 +206,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val consumer = this.consumers.head
consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable {
- def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount)
+ def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers)
}, 2, TimeUnit.SECONDS)
consumer.poll(0)
@@ -243,9 +276,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords)
val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords)
- val adminClient = AdminClient.createSimplePlaintext(this.brokerList)
- killBroker(adminClient.findCoordinator(dynamicGroup).id)
- killBroker(adminClient.findCoordinator(manualGroup).id)
+ killBroker(findCoordinator(dynamicGroup))
+ killBroker(findCoordinator(manualGroup))
val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs))
@@ -255,9 +287,16 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
restartDeadBrokers()
checkClosedState(dynamicGroup, 0)
checkClosedState(manualGroup, numRecords)
- adminClient.close()
}
+ private def findCoordinator(group: String) : Int = {
+ val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
+ val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR)
+ val response = FindCoordinatorResponse.parse(resp, ApiKeys.FIND_COORDINATOR.latestVersion())
+ response.node().id()
+ }
+
+
/**
* Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since
* there is no coordinator, but close should timeout and return. If close is invoked with a very
@@ -288,7 +327,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
@Test
def testCloseDuringRebalance() {
val topic = "closetest"
- createTopic(topic, 10, serverCount)
+ createTopic(topic, 10, numBrokers)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -355,7 +394,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- val consumer = super.createConsumer
+ val consumer = createConsumer
consumers += consumer
consumer
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 9da6937..3b63613 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -18,6 +18,8 @@ import java.util
import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import kafka.utils.TestUtils._
+
import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index db45196..7b68cc0 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.{Collections, Properties}
-import java.util.concurrent.{Callable, Executors, TimeUnit}
+import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import kafka.api._
@@ -41,7 +41,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
@@ -1374,4 +1374,15 @@ object TestUtils extends Logging {
(out.toString, err.toString)
}
+ def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+ try {
+ future.get()
+ fail("Expected CompletableFuture.get to return an exception")
+ } catch {
+ case e: ExecutionException =>
+ val cause = e.getCause()
+ assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
+ cause.getClass().getName, clazz.isInstance(cause))
+ }
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 249e2c3..64b23cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -17,12 +17,12 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -61,9 +61,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -77,20 +77,15 @@ public abstract class AbstractResetIntegrationTest {
private static MockTime mockTime;
private static KafkaStreams streams;
private static AdminClient adminClient = null;
- private static KafkaAdminClient kafkaAdminClient = null;
abstract Map<String, Object> getClientSslConfig();
@AfterClass
public static void afterClassCleanup() {
if (adminClient != null) {
- adminClient.close();
+ adminClient.close(10, TimeUnit.SECONDS);
adminClient = null;
}
- if (kafkaAdminClient != null) {
- kafkaAdminClient.close(10, TimeUnit.SECONDS);
- kafkaAdminClient = null;
- }
}
private String appID = "abstract-reset-integration-test";
@@ -103,9 +98,6 @@ public abstract class AbstractResetIntegrationTest {
if (adminClient == null) {
adminClient = AdminClient.create(commonClientConfig);
}
- if (kafkaAdminClient == null) {
- kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
- }
boolean timeSet = false;
while (!timeSet) {
@@ -184,8 +176,9 @@ public abstract class AbstractResetIntegrationTest {
@Override
public boolean conditionMet() {
try {
- return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty();
- } catch (final TimeoutException e) {
+ ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get();
+ return groupDescription.members().isEmpty();
+ } catch (final ExecutionException | InterruptedException e) {
return false;
}
}