You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/06 05:48:21 UTC
[kafka] branch trunk updated: KAFKA-9784: Add OffsetFetch to group
concurrency test (#8383)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b8ed0a KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)
3b8ed0a is described below
commit 3b8ed0a194bd1c90917c01ffc659fc05d2683c94
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sun Apr 5 22:47:38 2020 -0700
KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)
As title suggested, consumers would first do an OffsetFetch before starting the normal processing. It makes sense to add it to the concurrent test suite to verify whether there would be a blocking behavior.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../group/GroupCoordinatorConcurrencyTest.scala | 35 ++++++++++++++++++----
1 file changed, 29 insertions(+), 6 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 3bba9e1..50f0f5e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.Time
import org.easymock.EasyMock
import org.junit.Assert._
@@ -53,13 +53,16 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
private val allOperations = Seq(
new JoinGroupOperation,
new SyncGroupOperation,
+ new OffsetFetchOperation,
new CommitOffsetsOperation,
new HeartbeatOperation,
new LeaveGroupOperation
- )
+ )
+
private val allOperationsWithTxn = Seq(
new JoinGroupOperation,
new SyncGroupOperation,
+ new OffsetFetchOperation,
new CommitTxnOffsetsOperation,
new CompleteTxnOperation,
new HeartbeatOperation,
@@ -176,7 +179,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
class SyncGroupOperation extends GroupOperation[SyncGroupCallbackParams, SyncGroupCallback] {
override def responseCallback(responsePromise: Promise[SyncGroupCallbackParams]): SyncGroupCallback = {
val callback: SyncGroupCallback = syncGroupResult =>
- responsePromise.success(syncGroupResult.memberAssignment, syncGroupResult.error)
+ responsePromise.success(syncGroupResult.error, syncGroupResult.memberAssignment)
callback
}
override def runWithCallback(member: GroupMember, responseCallback: SyncGroupCallback): Unit = {
@@ -189,8 +192,10 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
}
override def awaitAndVerify(member: GroupMember): Unit = {
- val result = await(member, DefaultSessionTimeout)
- assertEquals(Errors.NONE, result._2)
+ val result = await(member, DefaultSessionTimeout)
+ assertEquals(Errors.NONE, result._1)
+ assertNotNull(result._2)
+ assertEquals(0, result._2.length)
}
}
@@ -209,6 +214,22 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
}
+ class OffsetFetchOperation extends GroupOperation[OffsetFetchCallbackParams, OffsetFetchCallback] {
+ override def responseCallback(responsePromise: Promise[OffsetFetchCallbackParams]): OffsetFetchCallback = {
+ val callback: OffsetFetchCallback = (error, offsets) => responsePromise.success(error, offsets)
+ callback
+ }
+ override def runWithCallback(member: GroupMember, responseCallback: OffsetFetchCallback): Unit = {
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(member.groupId, requireStable = true, None)
+ responseCallback(error, partitionData)
+ }
+ override def awaitAndVerify(member: GroupMember): Unit = {
+ val result = await(member, 500)
+ assertEquals(Errors.NONE, result._1)
+ assertEquals(Map.empty, result._2)
+ }
+ }
+
class CommitOffsetsOperation extends GroupOperation[CommitOffsetCallbackParams, CommitOffsetCallback] {
override def responseCallback(responsePromise: Promise[CommitOffsetCallbackParams]): CommitOffsetCallback = {
val callback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
@@ -290,10 +311,12 @@ object GroupCoordinatorConcurrencyTest {
type JoinGroupCallbackParams = JoinGroupResult
type JoinGroupCallback = JoinGroupResult => Unit
- type SyncGroupCallbackParams = (Array[Byte], Errors)
+ type SyncGroupCallbackParams = (Errors, Array[Byte])
type SyncGroupCallback = SyncGroupResult => Unit
type HeartbeatCallbackParams = Errors
type HeartbeatCallback = Errors => Unit
+ type OffsetFetchCallbackParams = (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData])
+ type OffsetFetchCallback = (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) => Unit
type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
type LeaveGroupCallbackParams = LeaveGroupResult