You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/01 21:20:24 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

vvcephei commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r481373305



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
-                    log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
+                    log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
+                         "the rebalance callback is triggered, marking this rebalance as failed and retry",
+                         generation, state);
                     resetStateAndRejoin();

Review comment:
       Should we also reset the generation here? With the new condition above, we may now enter this block if generation is _not_ `NO_GENERATION`. I'm not sure if we want to have the generation set to some value but state set to `UNJOINED` and `rejoinNeeded := true`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
-                    log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
+                    log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
+                         "the rebalance callback is triggered, marking this rebalance as failed and retry",
+                         generation, state);
                     resetStateAndRejoin();
                     resetJoinGroupFuture();
-                    return false;
                 }
             } else {
                 final RuntimeException exception = future.exception();
-                log.info("Join group failed with {}", exception.toString());
+                log.info("Rebalance failed with {}", exception.toString());

Review comment:
       Different question: can we report the exception as the "cause", rather than just getting the toString of it?
   ```suggestion
                   log.info("Rebalance failed.", exception);
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -433,7 +440,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     generationSnapshot = this.generation;
                 }
 
-                if (generationSnapshot != Generation.NO_GENERATION) {
+                if (generationSnapshot != Generation.NO_GENERATION && state == MemberState.STABLE) {

Review comment:
       Since the state can also be set from the heartbeat thread, do you think it would be a good idea to also get a "stateSnapshot" inside the synchronized block at L439 so that the state and generation are consistent wrt each other?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -652,10 +644,10 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
             } else if (error == Errors.MEMBER_ID_REQUIRED) {
                 // Broker requires a concrete member id to be allowed to join the group. Update member id
                 // and send another join group request in next cycle.
+                String memberId = joinResponse.data().memberId();
+                log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId);
                 synchronized (AbstractCoordinator.this) {
-                    AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                            joinResponse.data().memberId(), null);
-                    AbstractCoordinator.this.resetStateAndRejoin();

Review comment:
       We don't need to reset the state here anymore?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
##########
@@ -66,12 +73,16 @@ void sentHeartbeat(long now) {
         heartbeatInFlight = true;
         update(now);
         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
+
+        log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());

Review comment:
       Should this be inside `isTraceEnabled()` to avoid computing `remainingMs()` in the case that trace logging isn't on?
   
   ```suggestion
           if (log.isTraceEnabled()) {
               log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
           }
   ```
   
   (also below)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org