You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/06/21 06:41:52 UTC
[kafka] branch trunk updated: MINOR: Remove legacy
kafka.admin.AdminClient (#6947)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1233c96 MINOR: Remove legacy kafka.admin.AdminClient (#6947)
1233c96 is described below
commit 1233c963f81ebf09762754c7d4cde97c85ae364f
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 20 23:41:34 2019 -0700
MINOR: Remove legacy kafka.admin.AdminClient (#6947)
It has been deprecated since 0.11.0, it was never meant as a publicly
supported API and people should use
`org.apache.kafka.clients.admin.AdminClient` instead. Its presence
causes confusion and people still use them accidentally at times.
`BrokerApiVersionsCommand` uses one method that is not available
in `org.apache.kafka.clients.admin.AdminClient`, we inline it for now.
Reviewers: David Arthur <mu...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/admin/AdminClient.scala | 493 ---------------------
.../kafka/admin/BrokerApiVersionsCommand.scala | 247 ++++++++++-
.../kafka/api/LegacyAdminClientTest.scala | 156 -------
3 files changed, 245 insertions(+), 651 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
deleted file mode 100644
index c78a451..0000000
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ /dev/null
@@ -1,493 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.admin
-
-import java.io.IOException
-import java.nio.ByteBuffer
-import java.util.{Collections, Properties}
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
-
-import kafka.common.KafkaException
-import kafka.coordinator.group.GroupOverview
-import kafka.utils.Logging
-import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
-import org.apache.kafka.common.config.ConfigDef.ValidString._
-import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
-import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
-import org.apache.kafka.common.internals.ClusterResourceListeners
-import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData, FindCoordinatorRequestData}
-
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.Selector
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests._
-import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
-import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.LogContext
-import org.apache.kafka.common.utils.{KafkaThread, Time}
-import org.apache.kafka.common.{Node, TopicPartition}
-
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
-
-/**
- * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
- * and configurations. This client is deprecated, and will be replaced by org.apache.kafka.clients.admin.AdminClient.
- */
-@deprecated("This class is deprecated in favour of org.apache.kafka.clients.admin.AdminClient and it will be removed in " +
- "a future release.", since = "0.11.0")
-class AdminClient(val time: Time,
- val requestTimeoutMs: Int,
- val retryBackoffMs: Long,
- val client: ConsumerNetworkClient,
- val bootstrapBrokers: List[Node]) extends Logging {
-
- @volatile var running: Boolean = true
- val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
-
- val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
- override def run() {
- try {
- while (running)
- client.poll(time.timer(Long.MaxValue))
- } catch {
- case t : Throwable =>
- error("admin-client-network-thread exited", t)
- } finally {
- pendingFutures.asScala.foreach { future =>
- try {
- future.raise(Errors.UNKNOWN_SERVER_ERROR)
- } catch {
- case _: IllegalStateException => // It is OK if the future has been completed
- }
- }
- pendingFutures.clear()
- }
- }
- }, true)
-
- networkThread.start()
-
- private def send(target: Node,
- api: ApiKeys,
- request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
- val future: RequestFuture[ClientResponse] = client.send(target, request)
- pendingFutures.add(future)
- future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
- pendingFutures.remove(future)
- if (future.succeeded())
- future.value().responseBody()
- else
- throw future.exception()
- }
-
- private def sendAnyNode(api: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
- bootstrapBrokers.foreach { broker =>
- try {
- return send(broker, api, request)
- } catch {
- case e: AuthenticationException =>
- throw e
- case e: Exception =>
- debug(s"Request $api failed against node $broker", e)
- }
- }
- throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
- }
-
- def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
- val requestBuilder = new FindCoordinatorRequest.Builder(
- new FindCoordinatorRequestData()
- .setKeyType(CoordinatorType.GROUP.id)
- .setKey(groupId))
-
- def sendRequest: Try[FindCoordinatorResponse] =
- Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse])
-
- val startTime = time.milliseconds
- var response = sendRequest
-
- while ((response.isFailure || response.get.error == Errors.COORDINATOR_NOT_AVAILABLE) &&
- (time.milliseconds - startTime < timeoutMs)) {
-
- Thread.sleep(retryBackoffMs)
- response = sendRequest
- }
-
- def timeoutException(cause: Throwable) =
- throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", cause)
-
- response match {
- case Failure(exception) => throw timeoutException(exception)
- case Success(response) =>
- if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
- throw timeoutException(response.error.exception)
- response.error.maybeThrow()
- response.node
- }
- }
-
- def listGroups(node: Node): List[GroupOverview] = {
- val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
- response.error.maybeThrow()
- response.groups.asScala.map(group => GroupOverview(group.groupId, group.protocolType)).toList
- }
-
- def getApiVersions(node: Node): List[ApiVersion] = {
- val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
- response.error.maybeThrow()
- response.apiVersions.asScala.toList
- }
-
- /**
- * Wait until there is a non-empty list of brokers in the cluster.
- */
- def awaitBrokers() {
- var nodes = List[Node]()
- do {
- nodes = findAllBrokers()
- if (nodes.isEmpty)
- Thread.sleep(50)
- } while (nodes.isEmpty)
- }
-
- def findAllBrokers(): List[Node] = {
- val request = MetadataRequest.Builder.allTopics()
- val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
- val errors = response.errors
- if (!errors.isEmpty)
- debug(s"Metadata request contained errors: $errors")
- response.cluster.nodes.asScala.toList
- }
-
- def listAllGroups(): Map[Node, List[GroupOverview]] = {
- findAllBrokers().map { broker =>
- broker -> {
- try {
- listGroups(broker)
- } catch {
- case e: Exception =>
- debug(s"Failed to find groups from broker $broker", e)
- List[GroupOverview]()
- }
- }
- }.toMap
- }
-
- def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
- listAllGroups().mapValues { groups =>
- groups.filter(isConsumerGroup)
- }
- }
-
- def listAllGroupsFlattened(): List[GroupOverview] = {
- listAllGroups().values.flatten.toList
- }
-
- def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
- listAllGroupsFlattened().filter(isConsumerGroup)
- }
-
- private def isConsumerGroup(group: GroupOverview): Boolean = {
- // Consumer groups which are using group management use the "consumer" protocol type.
- // Consumer groups which are only using offset storage will have an empty protocol type.
- group.protocolType.isEmpty || group.protocolType == ConsumerProtocol.PROTOCOL_TYPE
- }
-
- def listGroupOffsets(groupId: String): Map[TopicPartition, Long] = {
- val coordinator = findCoordinator(groupId)
- val responseBody = send(coordinator, ApiKeys.OFFSET_FETCH, OffsetFetchRequest.Builder.allTopicPartitions(groupId))
- val response = responseBody.asInstanceOf[OffsetFetchResponse]
- if (response.hasError)
- throw response.error.exception
- response.maybeThrowFirstPartitionError()
- response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap
- }
-
- def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
- findAllBrokers().map { broker =>
- broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
- }.toMap
-
- /**
- * Case class used to represent a consumer of a consumer group
- */
- case class ConsumerSummary(consumerId: String,
- clientId: String,
- host: String,
- assignment: List[TopicPartition])
-
- /**
- * Case class used to represent group metadata (including the group coordinator) for the DescribeGroup API
- */
- case class ConsumerGroupSummary(state: String,
- assignmentStrategy: String,
- consumers: Option[List[ConsumerSummary]],
- coordinator: Node)
-
- def describeConsumerGroupHandler(coordinator: Node, groupId: String): DescribeGroupsResponseData.DescribedGroup = {
- val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
- new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(Collections.singletonList(groupId))))
- val response = responseBody.asInstanceOf[DescribeGroupsResponse]
- val metadata = response.data().groups().asScala.find(group => groupId.equals(group.groupId()))
- .getOrElse(throw new KafkaException(s"Response from broker contained no metadata for group $groupId"))
- metadata
- }
-
- def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = {
-
- def isValidConsumerGroupResponse(metadata: DescribeGroupsResponseData.DescribedGroup): Boolean =
- metadata.errorCode() == Errors.NONE.code() && (metadata.groupState() == "Dead" ||
- metadata.groupState() == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
-
- val startTime = time.milliseconds
- val coordinator = findCoordinator(groupId, timeoutMs)
- var metadata = describeConsumerGroupHandler(coordinator, groupId)
-
- while (!isValidConsumerGroupResponse(metadata) && time.milliseconds - startTime < timeoutMs) {
- debug(s"The consumer group response for group '$groupId' is invalid. Retrying the request as the group is initializing ...")
- Thread.sleep(retryBackoffMs)
- metadata = describeConsumerGroupHandler(coordinator, groupId)
- }
-
- if (!isValidConsumerGroupResponse(metadata))
- throw new TimeoutException("The consumer group command timed out while waiting for group to initialize")
-
- val consumers = metadata.members.asScala.map { consumer =>
- ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.groupState() match {
- case "Stable" =>
- val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(consumer.memberAssignment))
- assignment.partitions.asScala.toList
- case _ =>
- List()
- })
- }.toList
-
- ConsumerGroupSummary(metadata.groupState(), metadata.protocolData(), Some(consumers), coordinator)
- }
-
- def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
-
- def coordinatorLookup(group: String): Either[Node, Errors] = {
- try {
- Left(findCoordinator(group))
- } catch {
- case e: Throwable =>
- if (e.isInstanceOf[TimeoutException])
- Right(Errors.COORDINATOR_NOT_AVAILABLE)
- else
- Right(Errors.forException(e))
- }
- }
-
- var errors: Map[String, Errors] = Map()
- var groupsPerCoordinator: Map[Node, List[String]] = Map()
-
- groups.foreach { group =>
- coordinatorLookup(group) match {
- case Right(error) =>
- errors += group -> error
- case Left(coordinator) =>
- groupsPerCoordinator.get(coordinator) match {
- case Some(gList) =>
- val gListNew = group :: gList
- groupsPerCoordinator += coordinator -> gListNew
- case None =>
- groupsPerCoordinator += coordinator -> List(group)
- }
- }
- }
-
- groupsPerCoordinator.foreach { case (coordinator, groups) =>
- val responseBody = send(coordinator, ApiKeys.DELETE_GROUPS, new DeleteGroupsRequest.Builder(groups.toSet.asJava))
- val response = responseBody.asInstanceOf[DeleteGroupsResponse]
- groups.foreach {
- case group if response.hasError(group) => errors += group -> response.errors.get(group)
- case group => errors += group -> Errors.NONE
- }
- }
-
- errors
- }
-
- def close() {
- running = false
- try {
- client.close()
- } catch {
- case e: IOException =>
- error("Exception closing nioSelector:", e)
- }
- }
-
-}
-
-/*
- * CompositeFuture assumes that the future object in the futures list does not raise error
- */
-class CompositeFuture[T](time: Time,
- defaultResults: Map[TopicPartition, T],
- futures: List[RequestFuture[Map[TopicPartition, T]]]) extends Future[Map[TopicPartition, T]] {
-
- override def isCancelled = false
-
- override def cancel(interrupt: Boolean) = false
-
- override def get(): Map[TopicPartition, T] = {
- get(Long.MaxValue, TimeUnit.MILLISECONDS)
- }
-
- override def get(timeout: Long, unit: TimeUnit): Map[TopicPartition, T] = {
- val start: Long = time.milliseconds()
- val timeoutMs = unit.toMillis(timeout)
- var remaining: Long = timeoutMs
-
- val observedResults = futures.flatMap { future =>
- val elapsed = time.milliseconds() - start
- remaining = if (timeoutMs - elapsed > 0) timeoutMs - elapsed else 0L
-
- if (future.awaitDone(remaining, TimeUnit.MILLISECONDS)) future.value()
- else Map.empty[TopicPartition, T]
- }.toMap
-
- defaultResults ++ observedResults
- }
-
- override def isDone: Boolean = {
- futures.forall(_.isDone)
- }
-}
-
-@deprecated("This class is deprecated in favour of org.apache.kafka.clients.admin.AdminClient and it will be removed in " +
- "a future release.", since = "0.11.0")
-object AdminClient {
- val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
- val DefaultRequestTimeoutMs = 5000
- val DefaultMaxInFlightRequestsPerConnection = 100
- val DefaultReconnectBackoffMs = 50
- val DefaultReconnectBackoffMax = 50
- val DefaultSendBufferBytes = 128 * 1024
- val DefaultReceiveBufferBytes = 32 * 1024
- val DefaultRetryBackoffMs = 100
-
- val AdminClientIdSequence = new AtomicInteger(1)
- val AdminConfigDef = {
- val config = new ConfigDef()
- .define(
- CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
- Type.LIST,
- Importance.HIGH,
- CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
- .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
- Type.STRING,
- ClientDnsLookup.DEFAULT.toString,
- in(ClientDnsLookup.DEFAULT.toString,
- ClientDnsLookup.USE_ALL_DNS_IPS.toString,
- ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
- .define(
- CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
- ConfigDef.Type.STRING,
- CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- ConfigDef.Importance.MEDIUM,
- CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .define(
- CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
- ConfigDef.Type.INT,
- DefaultRequestTimeoutMs,
- ConfigDef.Importance.MEDIUM,
- CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
- .define(
- CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
- ConfigDef.Type.LONG,
- DefaultRetryBackoffMs,
- ConfigDef.Importance.MEDIUM,
- CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
- .withClientSslSupport()
- .withClientSaslSupport()
- config
- }
-
- class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
-
- def createSimplePlaintext(brokerUrl: String): AdminClient = {
- val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
- create(new AdminConfig(config))
- }
-
- def create(props: Properties): AdminClient = create(props.asScala.toMap)
-
- def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
-
- def create(config: AdminConfig): AdminClient = {
- val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
- val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
- val time = Time.SYSTEM
- val metrics = new Metrics(time)
- val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
- new ClusterResourceListeners)
- val channelBuilder = ClientUtils.createChannelBuilder(config, time)
- val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
- val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
-
- val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
- val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
- val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
- metadata.bootstrap(brokerAddresses, time.milliseconds())
-
- val selector = new Selector(
- DefaultConnectionMaxIdleMs,
- metrics,
- time,
- "admin",
- channelBuilder,
- logContext)
-
- val networkClient = new NetworkClient(
- selector,
- metadata,
- clientId,
- DefaultMaxInFlightRequestsPerConnection,
- DefaultReconnectBackoffMs,
- DefaultReconnectBackoffMax,
- DefaultSendBufferBytes,
- DefaultReceiveBufferBytes,
- requestTimeoutMs,
- ClientDnsLookup.DEFAULT,
- time,
- true,
- new ApiVersions,
- logContext)
-
- val highLevelClient = new ConsumerNetworkClient(
- logContext,
- networkClient,
- metadata,
- time,
- retryBackoffMs,
- requestTimeoutMs,
- Integer.MAX_VALUE)
-
- new AdminClient(
- time,
- requestTimeoutMs,
- retryBackoffMs,
- highLevelClient,
- metadata.fetch.nodes.asScala.toList)
- }
-}
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 88acf7f..4562f37 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -18,13 +18,32 @@
package kafka.admin
import java.io.PrintStream
+import java.io.IOException
import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.kafka.clients.CommonClientConfigs
+import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.ValidString._
+import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
+import org.apache.kafka.common.errors.AuthenticationException
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.Selector
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
-import scala.util.{Failure, Success}
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
/**
* A command for retrieving broker version information.
@@ -79,4 +98,228 @@ object BrokerApiVersionsCommand {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
}
}
+
+ // org.apache.kafka.clients.admin.AdminClient doesn't currently expose a way to retrieve the supported api versions.
+ // We inline the bits we need from kafka.admin.AdminClient so that we can delete it.
+ private class AdminClient(val time: Time,
+ val requestTimeoutMs: Int,
+ val retryBackoffMs: Long,
+ val client: ConsumerNetworkClient,
+ val bootstrapBrokers: List[Node]) extends Logging {
+
+ @volatile var running: Boolean = true
+ val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
+
+ val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
+ override def run() {
+ try {
+ while (running)
+ client.poll(time.timer(Long.MaxValue))
+ } catch {
+ case t : Throwable =>
+ error("admin-client-network-thread exited", t)
+ } finally {
+ pendingFutures.asScala.foreach { future =>
+ try {
+ future.raise(Errors.UNKNOWN_SERVER_ERROR)
+ } catch {
+ case _: IllegalStateException => // It is OK if the future has been completed
+ }
+ }
+ pendingFutures.clear()
+ }
+ }
+ }, true)
+
+ networkThread.start()
+
+ private def send(target: Node,
+ api: ApiKeys,
+ request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
+ val future: RequestFuture[ClientResponse] = client.send(target, request)
+ pendingFutures.add(future)
+ future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
+ pendingFutures.remove(future)
+ if (future.succeeded())
+ future.value().responseBody()
+ else
+ throw future.exception()
+ }
+
+ private def sendAnyNode(api: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
+ bootstrapBrokers.foreach { broker =>
+ try {
+ return send(broker, api, request)
+ } catch {
+ case e: AuthenticationException =>
+ throw e
+ case e: Exception =>
+ debug(s"Request $api failed against node $broker", e)
+ }
+ }
+ throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
+ }
+
+ private def getApiVersions(node: Node): List[ApiVersion] = {
+ val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
+ response.error.maybeThrow()
+ response.apiVersions.asScala.toList
+ }
+
+ /**
+ * Wait until there is a non-empty list of brokers in the cluster.
+ */
+ def awaitBrokers() {
+ var nodes = List[Node]()
+ do {
+ nodes = findAllBrokers()
+ if (nodes.isEmpty)
+ Thread.sleep(50)
+ } while (nodes.isEmpty)
+ }
+
+ private def findAllBrokers(): List[Node] = {
+ val request = MetadataRequest.Builder.allTopics()
+ val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
+ val errors = response.errors
+ if (!errors.isEmpty)
+ debug(s"Metadata request contained errors: $errors")
+ response.cluster.nodes.asScala.toList
+ }
+
+ def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
+ findAllBrokers().map { broker =>
+ broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
+ }.toMap
+
+ def close() {
+ running = false
+ try {
+ client.close()
+ } catch {
+ case e: IOException =>
+ error("Exception closing nioSelector:", e)
+ }
+ }
+
+ }
+
+ private object AdminClient {
+ val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
+ val DefaultRequestTimeoutMs = 5000
+ val DefaultMaxInFlightRequestsPerConnection = 100
+ val DefaultReconnectBackoffMs = 50
+ val DefaultReconnectBackoffMax = 50
+ val DefaultSendBufferBytes = 128 * 1024
+ val DefaultReceiveBufferBytes = 32 * 1024
+ val DefaultRetryBackoffMs = 100
+
+ val AdminClientIdSequence = new AtomicInteger(1)
+ val AdminConfigDef = {
+ val config = new ConfigDef()
+ .define(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ Importance.HIGH,
+ CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+ Type.STRING,
+ ClientDnsLookup.DEFAULT.toString,
+ in(ClientDnsLookup.DEFAULT.toString,
+ ClientDnsLookup.USE_ALL_DNS_IPS.toString,
+ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
+ Importance.MEDIUM,
+ CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+ .define(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ ConfigDef.Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .define(
+ CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+ ConfigDef.Type.INT,
+ DefaultRequestTimeoutMs,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+ .define(
+ CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+ ConfigDef.Type.LONG,
+ DefaultRetryBackoffMs,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport()
+ config
+ }
+
+ class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
+
+ def createSimplePlaintext(brokerUrl: String): AdminClient = {
+ val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
+ create(new AdminConfig(config))
+ }
+
+ def create(props: Properties): AdminClient = create(props.asScala.toMap)
+
+ def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
+
+ def create(config: AdminConfig): AdminClient = {
+ val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
+ val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
+ val time = Time.SYSTEM
+ val metrics = new Metrics(time)
+ val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
+ new ClusterResourceListeners)
+ val channelBuilder = ClientUtils.createChannelBuilder(config, time)
+ val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
+ val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
+
+ val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
+ val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
+ metadata.bootstrap(brokerAddresses, time.milliseconds())
+
+ val selector = new Selector(
+ DefaultConnectionMaxIdleMs,
+ metrics,
+ time,
+ "admin",
+ channelBuilder,
+ logContext)
+
+ val networkClient = new NetworkClient(
+ selector,
+ metadata,
+ clientId,
+ DefaultMaxInFlightRequestsPerConnection,
+ DefaultReconnectBackoffMs,
+ DefaultReconnectBackoffMax,
+ DefaultSendBufferBytes,
+ DefaultReceiveBufferBytes,
+ requestTimeoutMs,
+ ClientDnsLookup.DEFAULT,
+ time,
+ true,
+ new ApiVersions,
+ logContext)
+
+ val highLevelClient = new ConsumerNetworkClient(
+ logContext,
+ networkClient,
+ metadata,
+ time,
+ retryBackoffMs,
+ requestTimeoutMs,
+ Integer.MAX_VALUE)
+
+ new AdminClient(
+ time,
+ requestTimeoutMs,
+ retryBackoffMs,
+ highLevelClient,
+ metadata.fetch.nodes.asScala.toList)
+ }
+ }
+
}
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
deleted file mode 100644
index b30b363..0000000
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-
-import kafka.admin.AdminClient
-import kafka.server.KafkaConfig
-import java.lang.{Long => JLong}
-
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-
-/**
- * Tests for the deprecated Scala AdminClient.
- */
-@deprecated("The Scala AdminClient has been deprecated in favour of org.apache.kafka.clients.admin.AdminClient",
- since = "0.11.0")
-class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
-
- val producerCount = 1
- val consumerCount = 2
- val brokerCount = 3
- val groupId = "my-test"
- val clientId = "consumer-498"
-
- val topic = "topic"
- val part = 0
- val tp = new TopicPartition(topic, part)
- val part2 = 1
- val tp2 = new TopicPartition(topic, part2)
-
- var client: AdminClient = null
-
- // configure the servers and clients
- this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
- this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") // do initial rebalance immediately
- this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
- this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
- this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
-
- @Before
- override def setUp() {
- super.setUp()
- client = AdminClient.createSimplePlaintext(this.brokerList)
- createTopic(topic, 2, brokerCount)
- }
-
- @After
- override def tearDown() {
- client.close()
- super.tearDown()
- }
-
- @Test
- def testOffsetsForTimesWhenOffsetNotFound() {
- val consumer = createConsumer()
- assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
- }
-
- @Test
- def testListGroups() {
- val consumer = createConsumer()
- subscribeAndWaitForAssignment(topic, consumer)
-
- val groups = client.listAllGroupsFlattened
- assertFalse(groups.isEmpty)
- val group = groups.head
- assertEquals(groupId, group.groupId)
- assertEquals("consumer", group.protocolType)
- }
-
- @Test
- def testListAllBrokerVersionInfo() {
- val consumer = createConsumer()
- subscribeAndWaitForAssignment(topic, consumer)
-
- val brokerVersionInfos = client.listAllBrokerVersionInfo
- val brokers = brokerList.split(",")
- assertEquals(brokers.size, brokerVersionInfos.size)
- for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
- val hostStr = s"${node.host}:${node.port}"
- assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
- val brokerVersionInfo = tryBrokerVersionInfo.get
- assertEquals(2, brokerVersionInfo.latestUsableVersion(ApiKeys.API_VERSIONS))
- }
- }
-
- @Test
- def testGetConsumerGroupSummary() {
- val consumer = createConsumer()
- subscribeAndWaitForAssignment(topic, consumer)
-
- val group = client.describeConsumerGroup(groupId)
- assertEquals("range", group.assignmentStrategy)
- assertEquals("Stable", group.state)
- assertFalse(group.consumers.isEmpty)
-
- val member = group.consumers.get.head
- assertEquals(clientId, member.clientId)
- assertFalse(member.host.isEmpty)
- assertFalse(member.consumerId.isEmpty)
- }
-
- @Test
- def testDescribeConsumerGroup() {
- val consumer = createConsumer()
- subscribeAndWaitForAssignment(topic, consumer)
-
- val consumerGroupSummary = client.describeConsumerGroup(groupId)
- assertEquals(1, consumerGroupSummary.consumers.get.size)
- assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
- }
-
- @Test
- def testDescribeConsumerGroupForNonExistentGroup() {
- val nonExistentGroup = "non" + groupId
- assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
- }
-
- private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
- consumer.subscribe(Collections.singletonList(topic))
- TestUtils.waitUntilTrue(() => {
- consumer.poll(0)
- !consumer.assignment.isEmpty
- }, "Expected non-empty assignment")
- }
-
-}