You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/26 03:30:18 UTC

[kafka] branch trunk updated: MINOR: standardize rebalance related logging for easy discovery & debugging (#9295)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd462df  MINOR: standardize rebalance related logging for easy discovery & debugging (#9295)
bd462df is described below

commit bd462df20321ff5b75a7e3eae70634268582d90b
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Sep 25 20:29:17 2020 -0700

    MINOR: standardize rebalance related logging for easy discovery & debugging (#9295)
    
    Some minor logging adjustments to standardize the grammar of rebalance related messages and make it easy to query the logs for quick debugging results
    
    Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    | 52 ++++++++++++----------
 .../consumer/internals/ConsumerCoordinator.java    |  2 +-
 .../internals/StreamsPartitionAssignor.java        |  4 +-
 3 files changed, 32 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 080d9c4..b021c91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -444,7 +444,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     stateSnapshot = this.state;
                 }
 
-                if (generationSnapshot != Generation.NO_GENERATION && stateSnapshot == MemberState.STABLE) {
+                if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) {
                     // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                     ByteBuffer memberAssignment = future.value().duplicate();
 
@@ -563,7 +563,7 @@ public abstract class AbstractCoordinator implements Closeable {
             Errors error = joinResponse.error();
             if (error == Errors.NONE) {
                 if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
-                    log.error("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}",
+                    log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}",
                         joinResponse.data().protocolType(), protocolType());
                     future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                 } else {
@@ -598,11 +598,12 @@ public abstract class AbstractCoordinator implements Closeable {
                     }
                 }
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
-                log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
+                log.info("JoinGroup failed: Coordinator {} is loading the group.", coordinator());
                 // backoff and retry
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.debug("Attempt to join group failed due to unknown member id with {}.", sentGeneration);
+                log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}",
+                         error.message(), sentGeneration);
                 // only need to reset the member id if generation has not been changed,
                 // then retry immediately
                 if (generationUnchanged())
@@ -613,13 +614,14 @@ public abstract class AbstractCoordinator implements Closeable {
                     || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
                 markCoordinatorUnknown();
-                log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
+                log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}",
+                          error.message(), sentGeneration);
                 future.raise(error);
             } else if (error == Errors.FENCED_INSTANCE_ID) {
                 // for join-group request, even if the generation has changed we would not expect the instance id
                 // gets fenced, and hence we always treat this as a fatal error
-                log.error("Attempt to join group with generation {} failed because the group instance id {} has been fenced by another instance",
-                    rebalanceConfig.groupInstanceId, sentGeneration);
+                log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. " +
+                              "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
                 future.raise(error);
             } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                     || error == Errors.INVALID_SESSION_TIMEOUT
@@ -627,7 +629,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     || error == Errors.GROUP_AUTHORIZATION_FAILED
                     || error == Errors.GROUP_MAX_SIZE_REACHED) {
                 // log the error and re-throw the exception
-                log.error("Attempt to join group failed due to fatal error: {}", error.message());
+                log.error("JoinGroup failed due to fatal error: {}", error.message());
                 if (error == Errors.GROUP_MAX_SIZE_REACHED) {
                     future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +
                             " already has the configured maximum number of members."));
@@ -637,21 +639,22 @@ public abstract class AbstractCoordinator implements Closeable {
                     future.raise(error);
                 }
             } else if (error == Errors.UNSUPPORTED_VERSION) {
-                log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +
-                        " to see if the problem resolves");
+                log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id " +
+                          "and retry to see if the problem resolves");
                 future.raise(error);
             } else if (error == Errors.MEMBER_ID_REQUIRED) {
                 // Broker requires a concrete member id to be allowed to join the group. Update member id
                 // and send another join group request in next cycle.
                 String memberId = joinResponse.data().memberId();
-                log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId);
+                log.debug("JoinGroup failed due to non-fatal error: {} Will set the member id as {} and then rejoin. " +
+                              "Sent generation was  {}", error, memberId, sentGeneration);
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
                 }
                 future.raise(error);
             } else {
                 // unexpected error, throw the exception
-                log.error("Attempt to join group failed due to unexpected error: {}", error.message());
+                log.error("JoinGroup failed due to unexpected error: {}", error.message());
                 future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
             }
         }
@@ -732,7 +735,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     sensors.syncSensor.record(response.requestLatencyMs());
 
                     synchronized (AbstractCoordinator.this) {
-                        if (generation != Generation.NO_GENERATION && state == MemberState.COMPLETING_REBALANCE) {
+                        if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
                             // check protocol name only if the generation is not reset
                             final String protocolName = syncResponse.data.protocolName();
                             final boolean protocolNameInconsistent = protocolName != null &&
@@ -755,8 +758,8 @@ public abstract class AbstractCoordinator implements Closeable {
                                 future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
                             }
                         } else {
-                            log.info("Generation data was cleared by heartbeat thread as {} and state is now {} before " +
-                                "received SyncGroup response, marking this rebalance as failed and retry",
+                            log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
+                                "receiving SyncGroup response, marking this rebalance as failed and retry",
                                 generation, state);
                             // use ILLEGAL_GENERATION error code to let it retry immediately
                             future.raise(Errors.ILLEGAL_GENERATION);
@@ -769,24 +772,27 @@ public abstract class AbstractCoordinator implements Closeable {
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                    log.debug("SyncGroup failed because the group began another rebalance");
+                    log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
+                                 "Sent generation was {}", sentGeneration);
                     future.raise(error);
                 } else if (error == Errors.FENCED_INSTANCE_ID) {
                     // for sync-group request, even if the generation has changed we would not expect the instance id
                     // gets fenced, and hence we always treat this as a fatal error
-                    log.error("SyncGroup with {} failed because the group instance id {} has been fenced by another instance",
-                        sentGeneration, rebalanceConfig.groupInstanceId);
+                    log.error("SyncGroup failed: The group instance id {} has been fenced by another instance. " +
+                        "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
                     future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
-                    log.info("SyncGroup with {} failed: {}, would request re-join", sentGeneration, error.message());
+                    log.info("SyncGroup failed: {} Need to re-join the group. Sent generation was {}",
+                            error.message(), sentGeneration);
                     if (generationUnchanged())
                         resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error);
 
                     future.raise(error);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR) {
-                    log.debug("SyncGroup failed: {}, marking coordinator unknown", error.message());
+                    log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}",
+                             error.message(), sentGeneration);
                     markCoordinatorUnknown();
                     future.raise(error);
                 } else {
@@ -1484,18 +1490,18 @@ public abstract class AbstractCoordinator implements Closeable {
      * @return true if the two ids are matching.
      */
     final boolean hasMatchingGenerationId(int generationId) {
-        return generation != Generation.NO_GENERATION && generation.generationId == generationId;
+        return !generation.equals(Generation.NO_GENERATION) && generation.generationId == generationId;
     }
 
     final boolean hasUnknownGeneration() {
-        return generation == Generation.NO_GENERATION;
+        return generation.equals(Generation.NO_GENERATION);
     }
 
     /**
      * @return true if the current generation's member ID is valid, false otherwise
      */
     final boolean hasValidMemberId() {
-        return generation != Generation.NO_GENERATION && generation.hasMemberId();
+        return !hasUnknownGeneration() && generation.hasMemberId();
     }
 
     final synchronized void setNewGeneration(final Generation generation) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9ab2901..80be7a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -408,7 +408,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
 
                 // If revoked any partitions, need to re-join the group afterwards
-                log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
+                log.info("Need to revoke partitions {} and re-join the group", revokedPartitions);
                 requestRejoin();
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 37c0d88..d7df48f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1058,14 +1058,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
 
             if (!activeTasksRemovedPendingRevokation.isEmpty()) {
                 // TODO: once KAFKA-10078 is resolved we can leave it to the client to trigger this rebalance
-                log.info("Requesting {} followup rebalance be scheduled immediately due to tasks changing ownership.", consumer);
+                log.info("Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership.", consumer);
                 info.setNextRebalanceTime(0L);
                 followupRebalanceRequiredForRevokedTasks = true;
                 // Don't bother to schedule a probing rebalance if an immediate one is already scheduled
                 shouldEncodeProbingRebalance = false;
             } else if (shouldEncodeProbingRebalance) {
                 final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
-                log.info("Requesting {} followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.",
+                log.info("Requesting followup rebalance be scheduled by {} for {} ms to probe for caught-up replica tasks.",
                         consumer, nextRebalanceTimeMs);
                 info.setNextRebalanceTime(nextRebalanceTimeMs);
                 shouldEncodeProbingRebalance = false;