You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/06 23:16:28 UTC

kafka git commit: KAFKA-2400: Expose heartbeat interval in KafkaConsumer configuration

Repository: kafka
Updated Branches:
  refs/heads/trunk 9b1c52d52 -> 006b45c7e


KAFKA-2400: Expose heartbeat interval in KafkaConsumer configuration

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #116 from hachikuji/KAFKA-2400 and squashes the following commits:

3c1b1dd [Jason Gustafson] KAFKA-2400; expose heartbeat interval in KafkaConsumer configuration


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/006b45c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/006b45c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/006b45c7

Branch: refs/heads/trunk
Commit: 006b45c7e5b94e70b3a4200f4646042f3557c48e
Parents: 9b1c52d
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Aug 6 14:17:30 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Aug 6 14:17:30 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  | 11 +++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   |  1 +
 .../clients/consumer/internals/Coordinator.java |  3 ++-
 .../clients/consumer/internals/Heartbeat.java   | 24 ++++++++++----------
 .../consumer/internals/CoordinatorTest.java     |  2 ++
 .../consumer/internals/HeartbeatTest.java       |  7 +++---
 .../kafka/api/ConsumerBounceTest.scala          |  3 ++-
 .../integration/kafka/api/ConsumerTest.scala    |  3 +++
 8 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 8adabb6..d35b421 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -51,6 +51,12 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
 
     /**
+     * <code>heartbeat.interval.ms</code>
+     */
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+
+    /**
      * <code>bootstrap.servers</code>
      */
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@@ -171,6 +177,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         30000,
                                         Importance.HIGH,
                                         SESSION_TIMEOUT_MS_DOC)
+                                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
+                                        Type.INT,
+                                        3000,
+                                        Importance.HIGH,
+                                        HEARTBEAT_INTERVAL_MS_DOC)
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                         Type.STRING,
                                         "range",

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c57bba0..ed99e9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -540,6 +540,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.coordinator = new Coordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
                     this.subscriptions,
                     metrics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index cd5cdc3..70442aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -84,6 +84,7 @@ public final class Coordinator {
     public Coordinator(ConsumerNetworkClient client,
                        String groupId,
                        int sessionTimeoutMs,
+                       int heartbeatIntervalMs,
                        String assignmentStrategy,
                        SubscriptionState subscriptions,
                        Metrics metrics,
@@ -103,7 +104,7 @@ public final class Coordinator {
         this.subscriptions = subscriptions;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.assignmentStrategy = assignmentStrategy;
-        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
         this.heartbeatTask = new HeartbeatTask();
         this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
         this.requestTimeoutMs = requestTimeoutMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 6da8936..79e17e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -16,20 +16,21 @@ package org.apache.kafka.clients.consumer.internals;
  * A helper class for managing the heartbeat to the coordinator
  */
 public final class Heartbeat {
-    
-    /* The number of heartbeats to attempt to complete per session timeout interval.
-     * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
-     * once per second.
-     */
-    public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
-
     private final long timeout;
+    private final long interval;
+
     private long lastHeartbeatSend;
     private long lastHeartbeatReceive;
     private long lastSessionReset;
 
-    public Heartbeat(long timeout, long now) {
+    public Heartbeat(long timeout,
+                     long interval,
+                     long now) {
+        if (interval >= timeout)
+            throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
+
         this.timeout = timeout;
+        this.interval = interval;
         this.lastSessionReset = now;
     }
 
@@ -52,11 +53,10 @@ public final class Heartbeat {
     public long timeToNextHeartbeat(long now) {
         long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
 
-        long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
-        if (timeSinceLastHeartbeat > hbInterval)
+        if (timeSinceLastHeartbeat > interval)
             return 0;
         else
-            return hbInterval - timeSinceLastHeartbeat;
+            return interval - timeSinceLastHeartbeat;
     }
 
     public boolean sessionTimeoutExpired(long now) {
@@ -64,7 +64,7 @@ public final class Heartbeat {
     }
 
     public long interval() {
-        return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+        return interval;
     }
 
     public void resetSessionTimeout(long now) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index ca832be..a23b8e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -59,6 +59,7 @@ public class CoordinatorTest {
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
     private int sessionTimeoutMs = 10;
+    private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
     private long requestTimeoutMs = 5000;
     private String rebalanceStrategy = "not-matter";
@@ -89,6 +90,7 @@ public class CoordinatorTest {
         this.coordinator = new Coordinator(consumerClient,
                 groupId,
                 sessionTimeoutMs,
+                heartbeatIntervalMs,
                 rebalanceStrategy,
                 subscriptions,
                 metrics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index b587e14..75e68cc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -27,20 +27,21 @@ import static org.junit.Assert.assertTrue;
 public class HeartbeatTest {
 
     private long timeout = 300L;
+    private long interval = 100L;
     private MockTime time = new MockTime();
-    private Heartbeat heartbeat = new Heartbeat(timeout, -1L);
+    private Heartbeat heartbeat = new Heartbeat(timeout, interval, -1L);
 
     @Test
     public void testShouldHeartbeat() {
         heartbeat.sentHeartbeat(time.milliseconds());
-        time.sleep((long) ((float) timeout / Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL * 1.1));
+        time.sleep((long) ((float) interval * 1.1));
         assertTrue(heartbeat.shouldHeartbeat(time.milliseconds()));
     }
 
     @Test
     public void testShouldNotHeartbeat() {
         heartbeat.sentHeartbeat(time.milliseconds());
-        time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
+        time.sleep(interval / 2);
         assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 93f9468..1d07391 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -43,7 +43,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
-  this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20")
+  this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100")
+  this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
   override def generateConfigs() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/006b45c7/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 4ea49f2..f9e22ba 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -207,6 +207,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   def testPartitionReassignmentCallback() {
     val callback = new TestConsumerReassignmentCallback()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
     val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
     consumer0.subscribe(topic)
         
@@ -238,6 +239,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   def testUnsubscribeTopic() {
     val callback = new TestConsumerReassignmentCallback()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
     val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
 
     try {
@@ -267,6 +269,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
 
   def testPauseStateNotPreservedByRebalance() {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
     val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(), new ByteArrayDeserializer())
 
     sendRecords(5)