You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/06 05:48:21 UTC

[kafka] branch trunk updated: KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b8ed0a  KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)
3b8ed0a is described below

commit 3b8ed0a194bd1c90917c01ffc659fc05d2683c94
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sun Apr 5 22:47:38 2020 -0700

    KAFKA-9784: Add OffsetFetch to group concurrency test (#8383)
    
    As title suggested, consumers would first do an OffsetFetch before starting the normal processing. It makes sense to add it to the concurrent test suite to verify whether there would be a blocking behavior.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../group/GroupCoordinatorConcurrencyTest.scala    | 35 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 6 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 3bba9e1..50f0f5e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
 import org.apache.kafka.common.utils.Time
 import org.easymock.EasyMock
 import org.junit.Assert._
@@ -53,13 +53,16 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
   private val allOperations = Seq(
       new JoinGroupOperation,
       new SyncGroupOperation,
+      new OffsetFetchOperation,
       new CommitOffsetsOperation,
       new HeartbeatOperation,
       new LeaveGroupOperation
-    )
+  )
+
   private val allOperationsWithTxn = Seq(
     new JoinGroupOperation,
     new SyncGroupOperation,
+    new OffsetFetchOperation,
     new CommitTxnOffsetsOperation,
     new CompleteTxnOperation,
     new HeartbeatOperation,
@@ -176,7 +179,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
   class SyncGroupOperation extends GroupOperation[SyncGroupCallbackParams, SyncGroupCallback] {
     override def responseCallback(responsePromise: Promise[SyncGroupCallbackParams]): SyncGroupCallback = {
       val callback: SyncGroupCallback = syncGroupResult =>
-        responsePromise.success(syncGroupResult.memberAssignment, syncGroupResult.error)
+        responsePromise.success(syncGroupResult.error, syncGroupResult.memberAssignment)
       callback
     }
     override def runWithCallback(member: GroupMember, responseCallback: SyncGroupCallback): Unit = {
@@ -189,8 +192,10 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
       }
     }
     override def awaitAndVerify(member: GroupMember): Unit = {
-       val result = await(member, DefaultSessionTimeout)
-       assertEquals(Errors.NONE, result._2)
+      val result = await(member, DefaultSessionTimeout)
+      assertEquals(Errors.NONE, result._1)
+      assertNotNull(result._2)
+      assertEquals(0, result._2.length)
     }
   }
 
@@ -209,6 +214,22 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     }
   }
 
+  class OffsetFetchOperation extends GroupOperation[OffsetFetchCallbackParams, OffsetFetchCallback] {
+    override def responseCallback(responsePromise: Promise[OffsetFetchCallbackParams]): OffsetFetchCallback = {
+      val callback: OffsetFetchCallback = (error, offsets) => responsePromise.success(error, offsets)
+      callback
+    }
+    override def runWithCallback(member: GroupMember, responseCallback: OffsetFetchCallback): Unit = {
+      val (error, partitionData) = groupCoordinator.handleFetchOffsets(member.groupId, requireStable = true, None)
+      responseCallback(error, partitionData)
+    }
+    override def awaitAndVerify(member: GroupMember): Unit = {
+      val result = await(member, 500)
+      assertEquals(Errors.NONE, result._1)
+      assertEquals(Map.empty, result._2)
+    }
+  }
+
   class CommitOffsetsOperation extends GroupOperation[CommitOffsetCallbackParams, CommitOffsetCallback] {
     override def responseCallback(responsePromise: Promise[CommitOffsetCallbackParams]): CommitOffsetCallback = {
       val callback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
@@ -290,10 +311,12 @@ object GroupCoordinatorConcurrencyTest {
 
   type JoinGroupCallbackParams = JoinGroupResult
   type JoinGroupCallback = JoinGroupResult => Unit
-  type SyncGroupCallbackParams = (Array[Byte], Errors)
+  type SyncGroupCallbackParams = (Errors, Array[Byte])
   type SyncGroupCallback = SyncGroupResult => Unit
   type HeartbeatCallbackParams = Errors
   type HeartbeatCallback = Errors => Unit
+  type OffsetFetchCallbackParams = (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData])
+  type OffsetFetchCallback = (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) => Unit
   type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
   type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
   type LeaveGroupCallbackParams = LeaveGroupResult