You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/26 03:30:18 UTC
[kafka] branch trunk updated: MINOR: standardize rebalance related
logging for easy discovery & debugging (#9295)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd462df MINOR: standardize rebalance related logging for easy discovery & debugging (#9295)
bd462df is described below
commit bd462df20321ff5b75a7e3eae70634268582d90b
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Sep 25 20:29:17 2020 -0700
MINOR: standardize rebalance related logging for easy discovery & debugging (#9295)
Some minor logging adjustments to standardize the grammar of rebalance related messages and make it easy to query the logs for quick debugging results
Guozhang Wang <wa...@gmail.com>
---
.../consumer/internals/AbstractCoordinator.java | 52 ++++++++++++----------
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../internals/StreamsPartitionAssignor.java | 4 +-
3 files changed, 32 insertions(+), 26 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 080d9c4..b021c91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -444,7 +444,7 @@ public abstract class AbstractCoordinator implements Closeable {
stateSnapshot = this.state;
}
- if (generationSnapshot != Generation.NO_GENERATION && stateSnapshot == MemberState.STABLE) {
+ if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) {
// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
ByteBuffer memberAssignment = future.value().duplicate();
@@ -563,7 +563,7 @@ public abstract class AbstractCoordinator implements Closeable {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
- log.error("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}",
+ log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}",
joinResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
@@ -598,11 +598,12 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
- log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
+ log.info("JoinGroup failed: Coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
- log.debug("Attempt to join group failed due to unknown member id with {}.", sentGeneration);
+ log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}",
+ error.message(), sentGeneration);
// only need to reset the member id if generation has not been changed,
// then retry immediately
if (generationUnchanged())
@@ -613,13 +614,14 @@ public abstract class AbstractCoordinator implements Closeable {
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
- log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
+ log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}",
+ error.message(), sentGeneration);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for join-group request, even if the generation has changed we would not expect the instance id
// gets fenced, and hence we always treat this as a fatal error
- log.error("Attempt to join group with generation {} failed because the group instance id {} has been fenced by another instance",
- rebalanceConfig.groupInstanceId, sentGeneration);
+ log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. " +
+ "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
@@ -627,7 +629,7 @@ public abstract class AbstractCoordinator implements Closeable {
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
- log.error("Attempt to join group failed due to fatal error: {}", error.message());
+ log.error("JoinGroup failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +
" already has the configured maximum number of members."));
@@ -637,21 +639,22 @@ public abstract class AbstractCoordinator implements Closeable {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
- log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +
- " to see if the problem resolves");
+ log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id " +
+ "and retry to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
String memberId = joinResponse.data().memberId();
- log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId);
+ log.debug("JoinGroup failed due to non-fatal error: {} Will set the member id as {} and then rejoin. " +
+ "Sent generation was {}", error, memberId, sentGeneration);
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
future.raise(error);
} else {
// unexpected error, throw the exception
- log.error("Attempt to join group failed due to unexpected error: {}", error.message());
+ log.error("JoinGroup failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
@@ -732,7 +735,7 @@ public abstract class AbstractCoordinator implements Closeable {
sensors.syncSensor.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
- if (generation != Generation.NO_GENERATION && state == MemberState.COMPLETING_REBALANCE) {
+ if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is not reset
final String protocolName = syncResponse.data.protocolName();
final boolean protocolNameInconsistent = protocolName != null &&
@@ -755,8 +758,8 @@ public abstract class AbstractCoordinator implements Closeable {
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
} else {
- log.info("Generation data was cleared by heartbeat thread as {} and state is now {} before " +
- "received SyncGroup response, marking this rebalance as failed and retry",
+ log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
+ "receiving SyncGroup response, marking this rebalance as failed and retry",
generation, state);
// use ILLEGAL_GENERATION error code to let it retry immediately
future.raise(Errors.ILLEGAL_GENERATION);
@@ -769,24 +772,27 @@ public abstract class AbstractCoordinator implements Closeable {
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
- log.debug("SyncGroup failed because the group began another rebalance");
+ log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
+ "Sent generation was {}", sentGeneration);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has changed we would not expect the instance id
// gets fenced, and hence we always treat this as a fatal error
- log.error("SyncGroup with {} failed because the group instance id {} has been fenced by another instance",
- sentGeneration, rebalanceConfig.groupInstanceId);
+ log.error("SyncGroup failed: The group instance id {} has been fenced by another instance. " +
+ "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
- log.info("SyncGroup with {} failed: {}, would request re-join", sentGeneration, error.message());
+ log.info("SyncGroup failed: {} Need to re-join the group. Sent generation was {}",
+ error.message(), sentGeneration);
if (generationUnchanged())
resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error);
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
- log.debug("SyncGroup failed: {}, marking coordinator unknown", error.message());
+ log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}",
+ error.message(), sentGeneration);
markCoordinatorUnknown();
future.raise(error);
} else {
@@ -1484,18 +1490,18 @@ public abstract class AbstractCoordinator implements Closeable {
* @return true if the two ids are matching.
*/
final boolean hasMatchingGenerationId(int generationId) {
- return generation != Generation.NO_GENERATION && generation.generationId == generationId;
+ return !generation.equals(Generation.NO_GENERATION) && generation.generationId == generationId;
}
final boolean hasUnknownGeneration() {
- return generation == Generation.NO_GENERATION;
+ return generation.equals(Generation.NO_GENERATION);
}
/**
* @return true if the current generation's member ID is valid, false otherwise
*/
final boolean hasValidMemberId() {
- return generation != Generation.NO_GENERATION && generation.hasMemberId();
+ return !hasUnknownGeneration() && generation.hasMemberId();
}
final synchronized void setNewGeneration(final Generation generation) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9ab2901..80be7a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -408,7 +408,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
// If revoked any partitions, need to re-join the group afterwards
- log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
+ log.info("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 37c0d88..d7df48f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1058,14 +1058,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
if (!activeTasksRemovedPendingRevokation.isEmpty()) {
// TODO: once KAFKA-10078 is resolved we can leave it to the client to trigger this rebalance
- log.info("Requesting {} followup rebalance be scheduled immediately due to tasks changing ownership.", consumer);
+ log.info("Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership.", consumer);
info.setNextRebalanceTime(0L);
followupRebalanceRequiredForRevokedTasks = true;
// Don't bother to schedule a probing rebalance if an immediate one is already scheduled
shouldEncodeProbingRebalance = false;
} else if (shouldEncodeProbingRebalance) {
final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
- log.info("Requesting {} followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.",
+ log.info("Requesting followup rebalance be scheduled by {} for {} ms to probe for caught-up replica tasks.",
consumer, nextRebalanceTimeMs);
info.setNextRebalanceTime(nextRebalanceTimeMs);
shouldEncodeProbingRebalance = false;