You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/17 13:41:39 UTC
kafka git commit: KAFKA-5182: Reduce rebalance timeouts in request
quota test
Repository: kafka
Updated Branches:
refs/heads/trunk c36b5b7f6 -> 8c7e66313
KAFKA-5182: Reduce rebalance timeouts in request quota test
Reduce rebalance and session timeouts for join requests to trigger throttling in the request quota test.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Damian Guy <da...@gmail.com>
Closes #3057 from rajinisivaram/KAFKA-5182-quotatest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c7e6631
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c7e6631
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c7e6631
Branch: refs/heads/trunk
Commit: 8c7e6631308ae986bd60df0b0761f68c777fadff
Parents: c36b5b7
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed May 17 09:41:19 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed May 17 09:41:19 2017 -0400
----------------------------------------------------------------------
.../unit/kafka/server/RequestQuotaTest.scala | 78 +++++++++-----------
1 file changed, 33 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c7e6631/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4cf3e7d..1c496cd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -57,13 +57,14 @@ class RequestQuotaTest extends BaseRequestTest {
properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
+ properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName)
properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
}
@Before
override def setUp() {
-
RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
super.setUp()
@@ -86,54 +87,41 @@ class RequestQuotaTest extends BaseRequestTest {
@After
override def tearDown() {
- try {
- executor.shutdownNow()
- } finally {
- super.tearDown()
- }
+ try executor.shutdownNow()
+ finally super.tearDown()
}
@Test
def testResponseThrottleTime() {
+ for (apiKey <- RequestQuotaTest.ClientActions)
+ submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
- for (apiKey <- RequestQuotaTest.ClientActions) {
- val builder = requestBuilder(apiKey)
- submitTest(apiKey, () => {
- checkRequestThrottleTime(apiKey)
- })
- }
waitAndCheckResults()
}
@Test
def testUnthrottledClient() {
+ for (apiKey <- RequestQuotaTest.ClientActions)
+ submitTest(apiKey, () => checkUnthrottledClient(apiKey))
- for (apiKey <- RequestQuotaTest.ClientActions) {
- val builder = requestBuilder(apiKey)
- submitTest(apiKey, () => {
- checkUnthrottledClient(apiKey)
- })
- }
waitAndCheckResults()
}
@Test
def testExemptRequestTime() {
-
- for (apiKey <- RequestQuotaTest.ClusterActions) {
+ for (apiKey <- RequestQuotaTest.ClusterActions)
submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
- }
+
waitAndCheckResults()
}
@Test
def testUnauthorizedThrottle() {
-
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
- for (apiKey <- ApiKeys.values) {
+ for (apiKey <- ApiKeys.values)
submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
- }
+
waitAndCheckResults()
}
@@ -171,19 +159,19 @@ class RequestQuotaTest extends BaseRequestTest {
private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = {
apiKey match {
case ApiKeys.PRODUCE =>
- new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
case ApiKeys.FETCH =>
- val partitionMap = new LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
- partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
- requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap)
+ val partitionMap = new LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+ partitionMap.put(tp, new FetchRequest.PartitionData(0, 0, 100))
+ FetchRequest.Builder.forConsumer(0, 0, partitionMap)
case ApiKeys.METADATA =>
- new requests.MetadataRequest.Builder(List(topic).asJava)
+ new MetadataRequest.Builder(List(topic).asJava)
case ApiKeys.LIST_OFFSETS =>
- requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
case ApiKeys.LEADER_AND_ISR =>
@@ -197,29 +185,29 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.UPDATE_METADATA_KEY =>
val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
val securityProtocol = SecurityProtocol.PLAINTEXT
- val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
- Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
+ val brokers = Set(new UpdateMetadataRequest.Broker(brokerId,
+ Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava
- new requests.UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers)
+ new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY =>
- new requests.ControlledShutdownRequest.Builder(brokerId)
+ new ControlledShutdownRequest.Builder(brokerId)
case ApiKeys.OFFSET_COMMIT =>
- new requests.OffsetCommitRequest.Builder("test-group",
- Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
+ new OffsetCommitRequest.Builder("test-group",
+ Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
setMemberId("").setGenerationId(1).setRetentionTime(1000)
case ApiKeys.OFFSET_FETCH =>
- new requests.OffsetFetchRequest.Builder("test-group", List(tp).asJava)
+ new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
case ApiKeys.FIND_COORDINATOR =>
- new requests.FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+ new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
case ApiKeys.JOIN_GROUP =>
- new JoinGroupRequest.Builder("test-join-group", 10000, "", "consumer",
- List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
- .setRebalanceTimeout(60000)
+ new JoinGroupRequest.Builder("test-join-group", 200, "", "consumer",
+ List(new JoinGroupRequest.ProtocolMetadata("consumer-range", ByteBuffer.wrap("test".getBytes()))).asJava)
+ .setRebalanceTimeout(100)
case ApiKeys.HEARTBEAT =>
new HeartbeatRequest.Builder("test-group", 1, "")
@@ -337,12 +325,12 @@ class RequestQuotaTest extends BaseRequestTest {
private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
apiKey match {
case ApiKeys.PRODUCE => new ProduceResponse(response).getThrottleTime
- case ApiKeys.FETCH => new requests.FetchResponse(response).throttleTimeMs
+ case ApiKeys.FETCH => new FetchResponse(response).throttleTimeMs
case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
- case ApiKeys.OFFSET_COMMIT => new requests.OffsetCommitResponse(response).throttleTimeMs
- case ApiKeys.OFFSET_FETCH => new requests.OffsetFetchResponse(response).throttleTimeMs
- case ApiKeys.FIND_COORDINATOR => new requests.FindCoordinatorResponse(response).throttleTimeMs
+ case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
+ case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
+ case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs