You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/03 20:04:02 UTC
kafka git commit: KAFKA-2857;
Retry querying the consumer group while initializing
Repository: kafka
Updated Branches:
refs/heads/trunk a3c45b0c9 -> 573a6f398
KAFKA-2857; Retry querying the consumer group while initializing
This applies to new-consumer based groups and would avoid scenarios in which user issues a `--describe` query while the group is initializing.
Example: The following could occur for a newly created group.
```
kafkakafka:~/workspace/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Error: Executing consumer group command failed due to The group coordinator is not available.
```
With this PR the group is queried repeatedly at specific intervals within a preset (and configurable) timeout `group-init-timeout` to circumvent unfortunate situations like above.
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2538 from vahidhashemian/KAFKA-2857
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/573a6f39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/573a6f39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/573a6f39
Branch: refs/heads/trunk
Commit: 573a6f39863061a6f38a0aca35f11470c3e8538e
Parents: a3c45b0
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Mar 3 11:22:42 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Mar 3 12:01:38 2017 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/AdminClient.scala | 48 +++++--
.../kafka/admin/ConsumerGroupCommand.scala | 17 ++-
.../main/scala/kafka/tools/StreamsResetter.java | 2 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 133 ++++++++-----------
.../integration/ResetIntegrationTest.java | 2 +-
5 files changed, 110 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7cfc91a..4b28460 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -16,18 +16,21 @@ import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
-import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import kafka.common.KafkaException
import kafka.coordinator.GroupOverview
import kafka.utils.Logging
+
import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
+import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
+import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
@@ -37,6 +40,7 @@ import scala.util.Try
class AdminClient(val time: Time,
val requestTimeoutMs: Int,
+ val retryBackoffMs: Int,
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {
@@ -66,9 +70,19 @@ class AdminClient(val time: Time,
throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
}
- def findCoordinator(groupId: String): Node = {
+ def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
+ val startTime = time.milliseconds
val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
- val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+ var response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+
+ while (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
+ Thread.sleep(retryBackoffMs)
+ response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+ }
+
+ if (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception)
+
response.error.maybeThrow()
response.node
}
@@ -165,18 +179,34 @@ class AdminClient(val time: Time,
consumers: Option[List[ConsumerSummary]],
coordinator: Node)
- def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
- val coordinator = findCoordinator(groupId)
+ def describeConsumerGroupHandler(coordinator: Node, groupId: String): GroupMetadata = {
val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS,
new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)))
val response = responseBody.asInstanceOf[DescribeGroupsResponse]
val metadata = response.groups.get(groupId)
if (metadata == null)
throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
- if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
- throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")
+ metadata
+ }
+
+ def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = {
+
+ def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean =
+ metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+
+ val startTime = time.milliseconds
+ val coordinator = findCoordinator(groupId, timeoutMs)
+ var metadata = describeConsumerGroupHandler(coordinator, groupId)
+
+ while (!isValidConsumerGroupResponse(metadata) && time.milliseconds - startTime < timeoutMs) {
+ debug(s"The consumer group response for group '$groupId' is invalid. Retrying the request as the group is initializing ...")
+ Thread.sleep(retryBackoffMs)
+ metadata = describeConsumerGroupHandler(coordinator, groupId)
+ }
+
+ if (!isValidConsumerGroupResponse(metadata))
+ throw new TimeoutException("The consumer group command timed out while waiting for group to initialize")
- metadata.error.maybeThrow()
val consumers = metadata.members.asScala.map { consumer =>
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
case "Stable" =>
@@ -204,6 +234,7 @@ object AdminClient {
val DefaultSendBufferBytes = 128 * 1024
val DefaultReceiveBufferBytes = 32 * 1024
val DefaultRetryBackoffMs = 100
+
val AdminClientIdSequence = new AtomicInteger(1)
val AdminConfigDef = {
val config = new ConfigDef()
@@ -274,6 +305,7 @@ object AdminClient {
new AdminClient(
time,
DefaultRequestTimeoutMs,
+ DefaultRetryBackoffMs,
highLevelClient,
bootstrapCluster.nodes.asScala.toList)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 11f4f89..caad62a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -396,7 +396,7 @@ object ConsumerGroupCommand extends Logging {
}
protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
- val consumerGroupSummary = adminClient.describeConsumerGroup(group)
+ val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
(Some(consumerGroupSummary.state),
consumerGroupSummary.consumers match {
case None =>
@@ -502,7 +502,11 @@ object ConsumerGroupCommand extends Logging {
"for every consumer group. For instance --topic t1" + nl +
"WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
val NewConsumerDoc = "Use new consumer. This is the default."
+ val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
+ "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
+ "or is going through some changes)."
val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
+
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
.withRequiredArg
@@ -524,6 +528,11 @@ object ConsumerGroupCommand extends Logging {
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
+ val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
+ .withRequiredArg
+ .describedAs("timeout (ms)")
+ .ofType(classOf[Long])
+ .defaultsTo(5000)
val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.describedAs("command config property file")
@@ -531,11 +540,15 @@ object ConsumerGroupCommand extends Logging {
val options = parser.parse(args : _*)
val useOldConsumer = options.has(zkConnectOpt)
+ val describeOptPresent = options.has(describeOpt)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
def checkArgs() {
// check required args
+ if (options.has(timeoutMsOpt) && (!describeOptPresent || useOldConsumer))
+ debug(s"Option '$timeoutMsOpt' is applicable only when both '$bootstrapServerOpt' and '$describeOpt' are used.")
+
if (useOldConsumer) {
if (options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option '$bootstrapServerOpt' is not valid with '$zkConnectOpt'.")
@@ -550,7 +563,7 @@ object ConsumerGroupCommand extends Logging {
"committed offset for that group expires.")
}
- if (options.has(describeOpt))
+ if (describeOptPresent)
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index a61c092..83166cd 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -91,7 +91,7 @@ public class StreamsResetter {
adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
final String groupId = options.valueOf(applicationIdOption);
- if (!adminClient.describeConsumerGroup(groupId).consumers().get().isEmpty()) {
+ if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
"Make sure to stop all running application instances before running the reset tool.");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 8e10a87..905d113 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -23,6 +23,7 @@ import java.util.Collections
import java.util.Properties
import org.easymock.EasyMock
+import org.junit.Assert._
import org.junit.Before
import org.junit.Test
@@ -35,10 +36,11 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
+import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.clients.consumer.KafkaConsumer
class DescribeConsumerGroupTest extends KafkaServerTestHarness {
@@ -179,21 +181,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
- TestUtils.waitUntilTrue(() => {
- try {
- val (state, assignments) = consumerGroupCommand.describeGroup()
- println(state == Some("Dead") && assignments == Some(List()))
- state == Some("Dead") && assignments == Some(List())
- } catch {
- case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
- // Do nothing while the group initializes
- false
- case e: Throwable =>
- e.printStackTrace()
- throw e
- }
- }, "Expected the state to be 'Dead' with no members in the group.")
-
+ val (state, assignments) = consumerGroupCommand.describeGroup()
+ assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
consumerGroupCommand.close()
}
@@ -207,21 +196,13 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- try {
- val (state, assignments) = consumerGroupCommand.describeGroup()
- state == Some("Stable") &&
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
- assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
- assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
- } catch {
- case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
- // Do nothing while the group initializes
- false
- case e: Throwable =>
- throw e
- }
+ val (state, assignments) = consumerGroupCommand.describeGroup()
+ state == Some("Stable") &&
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 1 &&
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+ assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+ assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
consumerGroupCommand.close()
@@ -237,40 +218,24 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- try {
- val (state, _) = consumerGroupCommand.describeGroup()
- state == Some("Stable")
- } catch {
- case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
- // Do nothing while the group initializes
- false
- case e: Throwable =>
- throw e
- }
+ val (state, _) = consumerGroupCommand.describeGroup()
+ state == Some("Stable")
}, "Expected the group to initially become stable.")
// stop the consumer so the group has no active member anymore
executor.shutdown()
TestUtils.waitUntilTrue(() => {
- try {
- val (state, assignments) = consumerGroupCommand.describeGroup()
- state == Some("Empty") &&
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
- assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
- assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
- } catch {
- case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
- // Do nothing while the group initializes
- false
- case e: Throwable =>
- throw e
- } finally {
- consumerGroupCommand.close()
- }
+ val (state, assignments) = consumerGroupCommand.describeGroup()
+ state == Some("Empty") &&
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 1 &&
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+ assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+ assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, "Expected no active member in describe group results.")
+
+ consumerGroupCommand.close()
}
@Test
@@ -283,20 +248,12 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- try {
- val (state, assignments) = consumerGroupCommand.describeGroup()
- state == Some("Stable") &&
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 2 &&
- assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
- assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
- } catch {
- case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
- // Do nothing while the group initializes
- false
- case e: Throwable =>
- throw e
- }
+ val (state, assignments) = consumerGroupCommand.describeGroup()
+ state == Some("Stable") &&
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 2 &&
+ assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
+ assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
}, "Expected rows for consumers with no assigned partitions in describe group results.")
consumerGroupCommand.close()
@@ -315,24 +272,40 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- try {
val (state, assignments) = consumerGroupCommand.describeGroup()
state == Some("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
- } catch {
- case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException =>
- // Do nothing while the group initializes
- false
- case e: Throwable =>
- throw e
- }
}, "Expected two rows (one row per consumer) in describe group results.")
consumerGroupCommand.close()
}
+
+ @Test
+ def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() {
+ // run one consumer in the group consuming from a single-partition topic
+ val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+
+ // set the group initialization timeout too low for the group to stabilize
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "10")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ try {
+ val (state, assignments) = consumerGroupCommand.describeGroup()
+ fail("The consumer group command should fail due to low initialization timeout")
+ } catch {
+ case e: TimeoutException =>
+ // OK
+ case e: Throwable =>
+ fail("An unexpected exception occurred: " + e.getMessage)
+ throw e
+ } finally {
+ consumerGroupCommand.close()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 3248b2a..4804bfb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -421,7 +421,7 @@ public class ResetIntegrationTest {
private class WaitUntilConsumerGroupGotClosed implements TestCondition {
@Override
public boolean conditionMet() {
- return adminClient.describeConsumerGroup(APP_ID + testNo).consumers().get().isEmpty();
+ return adminClient.describeConsumerGroup(APP_ID + testNo, 0).consumers().get().isEmpty();
}
}