You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/20 20:56:14 UTC

[kafka] branch trunk updated: KAFKA-8569: integrate warning message under static membership (#6972)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03d61eb  KAFKA-8569: integrate warning message under static membership (#6972)
03d61eb is described below

commit 03d61ebfb93aab53b0b0ecdfc77175d18a58e861
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Jun 20 13:56:00 2019 -0700

    KAFKA-8569: integrate warning message under static membership (#6972)
    
    Static members never leave the group, so potentially we could log a flooding number of warning messages in the hb thread. The solution is to only log as warning when we are on dynamic membership.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../consumer/internals/AbstractCoordinator.java    | 23 +++++++++++-----------
 .../internals/ConsumerCoordinatorTest.java         |  6 +++---
 .../runtime/distributed/DistributedHerder.java     |  2 +-
 .../runtime/distributed/WorkerGroupMember.java     |  4 ++--
 .../runtime/distributed/DistributedHerderTest.java |  2 +-
 6 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 79afafa..fa5cc99 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1053,7 +1053,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
             this.subscriptions.unsubscribe();
             if (this.coordinator != null)
-                this.coordinator.maybeLeaveGroup();
+                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
             log.info("Unsubscribed all topics or patterns and assigned partitions");
         } finally {
             release();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index efe6cb7..5a5444e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -823,7 +823,7 @@ public abstract class AbstractCoordinator implements Closeable {
             // needs this lock to complete and terminate after close flag is set.
             synchronized (this) {
                 if (rebalanceConfig.leaveGroupOnClose) {
-                    maybeLeaveGroup();
+                    maybeLeaveGroup("the consumer is being closed");
                 }
 
                 // At this point, there may be pending commits (async commits or sync commits that were
@@ -840,8 +840,9 @@ public abstract class AbstractCoordinator implements Closeable {
 
     /**
      * Leave the current group and reset local generation/memberId.
+     * @param leaveReason reason to attempt leaving the group
      */
-    public synchronized void maybeLeaveGroup() {
+    public synchronized void maybeLeaveGroup(String leaveReason) {
         // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
         // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
         // and the membership expiration is only controlled by session timeout.
@@ -849,7 +850,8 @@ public abstract class AbstractCoordinator implements Closeable {
                 state != MemberState.UNJOINED && generation.hasMemberId()) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
-            log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
+            log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
+                     generation.memberId, coordinator, leaveReason);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
                     .setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId));
             client.send(coordinator, request)
@@ -1088,14 +1090,13 @@ public abstract class AbstractCoordinator implements Closeable {
                             markCoordinatorUnknown();
                         } else if (heartbeat.pollTimeoutExpired(now)) {
                             // the poll timeout has expired, which means that the foreground thread has stalled
-                            // in between calls to poll(), so we explicitly leave the group.
-                            log.warn("This member will leave the group because consumer poll timeout has expired. This " +
-                                    "means the time between subsequent calls to poll() was longer than the configured " +
-                                    "max.poll.interval.ms, which typically implies that the poll loop is spending too " +
-                                    "much time processing messages. You can address this either by increasing " +
-                                    "max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " +
-                                    "with max.poll.records.");
-                            maybeLeaveGroup();
+                            // in between calls to poll().
+                            String leaveReason = "consumer poll timeout has expired. This means the time between subsequent calls to poll() " +
+                                                    "was longer than the configured max.poll.interval.ms, which typically implies that " +
+                                                    "the poll loop is spending too much time processing messages. " +
+                                                    "You can address this either by increasing max.poll.interval.ms or by reducing " +
+                                                    "the maximum size of batches returned in poll() with max.poll.records.";
+                            maybeLeaveGroup(leaveReason);
                         } else if (!heartbeat.shouldHeartbeat(now)) {
                             // poll again after waiting for the retry backoff in case the heartbeat failed or the
                             // coordinator disconnected
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index f4b87d2..1dc1780 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -835,7 +835,7 @@ public class ConsumerCoordinatorTest {
                         leaveRequest.data().groupId().equals(groupId);
             }
         }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
-        coordinator.maybeLeaveGroup();
+        coordinator.maybeLeaveGroup("test maybe leave group");
         assertTrue(received.get());
 
         AbstractCoordinator.Generation generation = coordinator.generation();
@@ -873,7 +873,7 @@ public class ConsumerCoordinatorTest {
             }
         }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
 
-        coordinator.maybeLeaveGroup();
+        coordinator.maybeLeaveGroup("pending member leaves");
         assertTrue(received.get());
     }
 
@@ -1509,7 +1509,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
                 .setErrorCode(Errors.NONE.code())));
         subscriptions.unsubscribe();
-        coordinator.maybeLeaveGroup();
+        coordinator.maybeLeaveGroup("test commit after leave");
         subscriptions.assignFromUser(singleton(t1p));
 
         // the client should not reuse generation/memberId from auto-subscribed generation
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8e59d87..b1a41c0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -977,7 +977,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             // in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already)
             // and back off to avoid a tight loop of rejoin-attempt-to-catch-up-leave
             log.warn("Didn't reach end of config log quickly enough", e);
-            member.maybeLeaveGroup();
+            member.maybeLeaveGroup("taking too long to read the log");
             backoff(workerUnsyncBackoffMs);
             return false;
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 94cf97d..4819db5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -183,8 +183,8 @@ public class WorkerGroupMember {
         coordinator.requestRejoin();
     }
 
-    public void maybeLeaveGroup() {
-        coordinator.maybeLeaveGroup();
+    public void maybeLeaveGroup(String leaveReason) {
+        coordinator.maybeLeaveGroup(leaveReason);
     }
 
     public String ownerUrl(String connector) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b03ddf3..f6cbfd6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1442,7 +1442,7 @@ public class DistributedHerderTest {
         // Reading to end of log times out
         configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall().andThrow(new TimeoutException());
-        member.maybeLeaveGroup();
+        member.maybeLeaveGroup("test join leader catch up fails");
         EasyMock.expectLastCall();
         PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
         member.requestRejoin();