You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/29 03:45:41 UTC

[kafka] branch trunk updated: MINOR: Ensure heartbeat last poll time always updated (#5308)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 919409b  MINOR: Ensure heartbeat last poll time always updated (#5308)
919409b is described below

commit 919409bb2331fbd60cd3d425fe7d9384d66967fa
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Jun 28 20:45:29 2018 -0700

    MINOR: Ensure heartbeat last poll time always updated (#5308)
    
    We need to ensure that the last poll time is always updated when the user call poll(Duration). This patch fixes a bug in the new KIP-266 timeout behavior which would cause this to be skipped if the coordinator could not be found while the consumer was in an active group.
    
    Note that I've also fixed some type inconsistencies for various timeouts.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 10 ++--
 .../consumer/internals/AbstractCoordinator.java    | 20 +++++++-
 .../consumer/internals/ConsumerCoordinator.java    | 11 ++--
 .../clients/consumer/internals/Heartbeat.java      | 32 ++++++------
 .../java/org/apache/kafka/clients/MockClient.java  |  2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  5 +-
 .../internals/ConsumerCoordinatorTest.java         | 58 +++++++++++++++++-----
 .../clients/consumer/internals/HeartbeatTest.java  | 14 +++---
 8 files changed, 106 insertions(+), 46 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 550bece..f722408 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.Heartbeat;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -745,12 +746,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
+
+            int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+            int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
             this.coordinator = new ConsumerCoordinator(logContext,
                     this.client,
                     groupId,
-                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
-                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
-                    heartbeatIntervalMs,
+                    maxPollIntervalMs,
+                    sessionTimeoutMs,
+                    new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
                     assignors,
                     this.metadata,
                     this.subscriptions,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 8f68138..b5c7a66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -129,7 +129,7 @@ public abstract class AbstractCoordinator implements Closeable {
                                String groupId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
-                               int heartbeatIntervalMs,
+                               Heartbeat heartbeat,
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
@@ -142,11 +142,27 @@ public abstract class AbstractCoordinator implements Closeable {
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.leaveGroupOnClose = leaveGroupOnClose;
-        this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
+        this.heartbeat = heartbeat;
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
     }
 
+    public AbstractCoordinator(LogContext logContext,
+                               ConsumerNetworkClient client,
+                               String groupId,
+                               int rebalanceTimeoutMs,
+                               int sessionTimeoutMs,
+                               int heartbeatIntervalMs,
+                               Metrics metrics,
+                               String metricGrpPrefix,
+                               Time time,
+                               long retryBackoffMs,
+                               boolean leaveGroupOnClose) {
+        this(logContext, client, groupId, rebalanceTimeoutMs, sessionTimeoutMs,
+                new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
+                metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
+    }
+
     /**
      * Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
      * @return Non-null protocol type name
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b484e11..060e404 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -118,7 +118,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                String groupId,
                                int rebalanceTimeoutMs,
                                int sessionTimeoutMs,
-                               int heartbeatIntervalMs,
+                               Heartbeat heartbeat,
                                List<PartitionAssignor> assignors,
                                Metadata metadata,
                                SubscriptionState subscriptions,
@@ -136,7 +136,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
               groupId,
               rebalanceTimeoutMs,
               sessionTimeoutMs,
-              heartbeatIntervalMs,
+              heartbeat,
               metrics,
               metricGrpPrefix,
               time,
@@ -306,13 +306,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         invokeCompletedOffsetCommitCallbacks();
 
         if (subscriptions.partitionsAutoAssigned()) {
+            // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
+            // group proactively due to application inactivity even if (say) the coordinator cannot be found.
+            pollHeartbeat(currentTime);
+
             if (coordinatorUnknown()) {
                 if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                     return false;
                 }
                 currentTime = time.milliseconds();
                 elapsed = currentTime - startTime;
-
             }
 
             if (rejoinNeededOrPending()) {
@@ -333,8 +336,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                 currentTime = time.milliseconds();
             }
-
-            pollHeartbeat(currentTime);
         } else {
             // For manually assigned partitions, if there are no ready nodes, await metadata.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 38a7c78..01d7810 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -20,9 +20,9 @@ package org.apache.kafka.clients.consumer.internals;
  * A helper class for managing the heartbeat to the coordinator
  */
 public final class Heartbeat {
-    private final long sessionTimeout;
-    private final long heartbeatInterval;
-    private final long maxPollInterval;
+    private final int sessionTimeoutMs;
+    private final int heartbeatIntervalMs;
+    private final int maxPollIntervalMs;
     private final long retryBackoffMs;
 
     private volatile long lastHeartbeatSend; // volatile since it is read by metrics
@@ -31,16 +31,16 @@ public final class Heartbeat {
     private long lastPoll;
     private boolean heartbeatFailed;
 
-    public Heartbeat(long sessionTimeout,
-                     long heartbeatInterval,
-                     long maxPollInterval,
+    public Heartbeat(int sessionTimeoutMs,
+                     int heartbeatIntervalMs,
+                     int maxPollIntervalMs,
                      long retryBackoffMs) {
-        if (heartbeatInterval >= sessionTimeout)
+        if (heartbeatIntervalMs >= sessionTimeoutMs)
             throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
 
-        this.sessionTimeout = sessionTimeout;
-        this.heartbeatInterval = heartbeatInterval;
-        this.maxPollInterval = maxPollInterval;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.heartbeatIntervalMs = heartbeatIntervalMs;
+        this.maxPollIntervalMs = maxPollIntervalMs;
         this.retryBackoffMs = retryBackoffMs;
     }
 
@@ -75,7 +75,7 @@ public final class Heartbeat {
         if (heartbeatFailed)
             delayToNextHeartbeat = retryBackoffMs;
         else
-            delayToNextHeartbeat = heartbeatInterval;
+            delayToNextHeartbeat = heartbeatIntervalMs;
 
         if (timeSinceLastHeartbeat > delayToNextHeartbeat)
             return 0;
@@ -84,11 +84,11 @@ public final class Heartbeat {
     }
 
     public boolean sessionTimeoutExpired(long now) {
-        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
+        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;
     }
 
     public long interval() {
-        return heartbeatInterval;
+        return heartbeatIntervalMs;
     }
 
     public void resetTimeouts(long now) {
@@ -98,7 +98,11 @@ public final class Heartbeat {
     }
 
     public boolean pollTimeoutExpired(long now) {
-        return now - lastPoll > maxPollInterval;
+        return now - lastPoll > maxPollIntervalMs;
+    }
+
+    public long lastPollTime() {
+        return lastPoll;
     }
 
 }
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index e82b0dd..0f64f13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -491,7 +491,7 @@ public class MockClient implements KafkaClient {
     }
 
     public boolean hasPendingResponses() {
-        return !responses.isEmpty();
+        return !responses.isEmpty() || !futureResponses.isEmpty();
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 316404b..c7cfeb0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.Heartbeat;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -1772,13 +1773,15 @@ public class KafkaConsumerTest {
         LogContext loggerFactory = new LogContext();
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
                 retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
+
+        Heartbeat heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
         ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
                 loggerFactory,
                 consumerClient,
                 groupId,
                 rebalanceTimeoutMs,
                 sessionTimeoutMs,
-                heartbeatIntervalMs,
+                heartbeat,
                 assignors,
                 metadata,
                 subscriptions,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index bd0038d..7c2638c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -90,17 +90,20 @@ import static org.junit.Assert.fail;
 
 public class ConsumerCoordinatorTest {
 
-    private String topic1 = "test1";
-    private String topic2 = "test2";
-    private String groupId = "test-group";
-    private TopicPartition t1p = new TopicPartition(topic1, 0);
-    private TopicPartition t2p = new TopicPartition(topic2, 0);
-    private int rebalanceTimeoutMs = 60000;
-    private int sessionTimeoutMs = 10000;
-    private int heartbeatIntervalMs = 5000;
-    private long retryBackoffMs = 100;
-    private int autoCommitIntervalMs = 2000;
-    private int requestTimeoutMs = 30000;
+    private final String topic1 = "test1";
+    private final String topic2 = "test2";
+    private final TopicPartition t1p = new TopicPartition(topic1, 0);
+    private final TopicPartition t2p = new TopicPartition(topic2, 0);
+    private final String groupId = "test-group";
+    private final int rebalanceTimeoutMs = 60000;
+    private final int sessionTimeoutMs = 10000;
+    private final int heartbeatIntervalMs = 5000;
+    private final long retryBackoffMs = 100;
+    private final int autoCommitIntervalMs = 2000;
+    private final int requestTimeoutMs = 30000;
+    private final Heartbeat heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs,
+            rebalanceTimeoutMs, retryBackoffMs);
+
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.<PartitionAssignor>singletonList(partitionAssignor);
     private MockTime time;
@@ -141,6 +144,7 @@ public class ConsumerCoordinatorTest {
     @After
     public void teardown() {
         this.metrics.close();
+        this.coordinator.close(0);
     }
 
     @Test
@@ -580,6 +584,35 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testUpdateLastHeartbeatPollWhenCoordinatorUnknown() throws Exception {
+        // If we are part of an active group and we cannot find the coordinator, we should nevertheless
+        // continue to update the last poll time so that we do not expire the consumer
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // Join the group, but signal a coordinator change after the first heartbeat
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
+
+        coordinator.poll(Long.MAX_VALUE);
+        time.sleep(heartbeatIntervalMs);
+
+        // Await the first heartbeat which forces us to find a new coordinator
+        TestUtils.waitForCondition(() -> !client.hasPendingResponses(),
+                "Failed to observe expected heartbeat from background thread");
+
+        assertTrue(coordinator.coordinatorUnknown());
+        assertFalse(coordinator.poll(0));
+        assertEquals(time.milliseconds(), heartbeat.lastPollTime());
+
+        time.sleep(rebalanceTimeoutMs - 1);
+        assertFalse(heartbeat.pollTimeoutExpired(time.milliseconds()));
+    }
+
+    @Test
     public void testPatternJoinGroupFollower() {
         final String consumerId = "consumer";
 
@@ -1695,7 +1728,6 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testHeartbeatThreadClose() throws Exception {
-        groupId = "testCloseTimeoutWithHeartbeatThread";
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         coordinator.ensureActiveGroup();
         time.sleep(heartbeatIntervalMs + 100);
@@ -1831,7 +1863,7 @@ public class ConsumerCoordinatorTest {
                 groupId,
                 rebalanceTimeoutMs,
                 sessionTimeoutMs,
-                heartbeatIntervalMs,
+                heartbeat,
                 assignors,
                 metadata,
                 subscriptions,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index 06cdae7..7db7820 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -26,24 +26,24 @@ import static org.junit.Assert.assertTrue;
 
 public class HeartbeatTest {
 
-    private long timeout = 300L;
-    private long interval = 100L;
-    private long maxPollInterval = 900L;
-    private long retryBackoff = 10L;
+    private int sessionTimeoutMs = 300;
+    private int heartbeatIntervalMs = 100;
+    private int maxPollIntervalMs = 900;
+    private long retryBackoffMs = 10L;
     private MockTime time = new MockTime();
-    private Heartbeat heartbeat = new Heartbeat(timeout, interval, maxPollInterval, retryBackoff);
+    private Heartbeat heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs);
 
     @Test
     public void testShouldHeartbeat() {
         heartbeat.sentHeartbeat(time.milliseconds());
-        time.sleep((long) ((float) interval * 1.1));
+        time.sleep((long) ((float) heartbeatIntervalMs * 1.1));
         assertTrue(heartbeat.shouldHeartbeat(time.milliseconds()));
     }
 
     @Test
     public void testShouldNotHeartbeat() {
         heartbeat.sentHeartbeat(time.milliseconds());
-        time.sleep(interval / 2);
+        time.sleep(heartbeatIntervalMs / 2);
         assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
     }