You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:25:54 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance

abbccdda commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r437110527



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -604,6 +605,25 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
         assertEquals(newGen, coordinator.generation());
     }
 
+    @Test
+    public void testHeartbeatSentWhenRebalancing() throws Exception {
+        setupCoordinator();
+        joinGroup();
+
+        final AbstractCoordinator.Generation currGen = coordinator.generation();
+
+        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+        // the heartbeat thread should be sent out during a rebalance

Review comment:
       remove `thread`

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -652,9 +651,10 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
             } else if (error == Errors.MEMBER_ID_REQUIRED) {
                 // Broker requires a concrete member id to be allowed to join the group. Update member id
                 // and send another join group request in next cycle.
+                String memberId = joinResponse.data().memberId();
+                log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId);

Review comment:
       There might be slight performance gain if we just say "Attempt to join group and receive member id required error." instead of passing in the error.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
           group.currentState match {
             case PreparingRebalance =>
-              updateMemberAndRebalance(group, member, protocols, responseCallback)
+              updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback)

Review comment:
       Just to confirm, this file only has logging changes right?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation generation) {
         public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
             sensors.heartbeatSensor.record(response.requestLatencyMs());
             Errors error = heartbeatResponse.error();
+
+            if (state != MemberState.STABLE) {

Review comment:
       We should still handle fatal exception IMHO, such as FencedInstanceIdException

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
                 }
 
                 private void recordRebalanceFailure() {
-                    state = MemberState.UNJOINED;

Review comment:
       Comment here as no better place: on L485 we have this logic:
   ```
   if (joinFuture == null) {
               // fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
               // Note that this must come after the call to onJoinPrepare since we must be able to continue
               // sending heartbeats if that callback takes some time.
               disableHeartbeatThread();
   ```
   As we are ensuring the heartbeat thread working during rebalance, will this case be a bit dangerous for heartbeat disabling? Maybe we could also do a check of member status here to decide whether to disable.
   

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -604,6 +605,25 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
         assertEquals(newGen, coordinator.generation());
     }
 
+    @Test
+    public void testHeartbeatSentWhenRebalancing() throws Exception {
+        setupCoordinator();
+        joinGroup();
+
+        final AbstractCoordinator.Generation currGen = coordinator.generation();
+
+        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+        // the heartbeat thread should be sent out during a rebalance
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+        TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000,
+                "The heartbeat request was not sent");
+        assertTrue(coordinator.heartbeat().hasInflight());
+
+        mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

Review comment:
       Why do we need to respond?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
                 }
 
                 private void recordRebalanceFailure() {

Review comment:
       Do we still need this private function then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org