You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/07/01 13:20:35 UTC

[incubator-openwhisk] branch master updated: Adjust kafka's maximum poll period. (#2450)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c34ed  Adjust kafka's maximum poll period. (#2450)
b1c34ed is described below

commit b1c34ed587efebc79539850e7065cf3e760b3c7d
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Sat Jul 1 15:20:32 2017 +0200

    Adjust kafka's maximum poll period. (#2450)
    
    Kafka allows setting the maximum period between two polls. As we allow for 5 minutes of action execution, in a fully loaded invoker it might take 5 minutes for the consumer to poll again. To prevent rebalances this value is adjusted accordingly.
    
    Bumping Kafka's version is needed because that value is exposed only in 0.10.1+.
---
 ansible/group_vars/all                                    |  2 +-
 common/scala/build.gradle                                 |  2 +-
 .../whisk/connector/kafka/KafkaConsumerConnector.scala    | 11 ++++++-----
 .../src/main/scala/whisk/core/invoker/Invoker.scala       |  4 ++--
 tests/src/test/scala/services/KafkaConnectorTests.scala   | 15 +++++++++------
 5 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 621d48a..5fa8eed 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -121,7 +121,7 @@ registry:
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
-  version: 0.10.0.1
+  version: 0.10.2.1
   port: 9092
   ras:
     port: 9093
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 3cf8fd5..6c89ddd 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -30,7 +30,7 @@ dependencies {
     compile 'commons-logging:commons-logging:1.2'
     compile 'commons-collections:commons-collections:3.2.2'
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
-    compile 'org.apache.kafka:kafka-clients:0.10.0.0'
+    compile 'org.apache.kafka:kafka-clients:0.10.2.1'
     compile 'org.apache.httpcomponents:httpclient:4.4.1'
     compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index ea1ff11..be73345 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions.seqAsJavaList
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
-import scala.language.postfixOps
 import scala.util.Try
 
 import org.apache.kafka.clients.consumer.CommitFailedException
@@ -41,8 +40,9 @@ class KafkaConsumerConnector(
     topic: String,
     override val maxPeek: Int = Int.MaxValue,
     readeos: Boolean = true,
-    sessionTimeout: FiniteDuration = 30 seconds,
-    autoCommitInterval: FiniteDuration = 10 seconds)(
+    sessionTimeout: FiniteDuration = 30.seconds,
+    autoCommitInterval: FiniteDuration = 10.seconds,
+    maxPollInterval: FiniteDuration = 5.minutes)(
         implicit logging: Logging)
     extends MessageConsumer {
 
@@ -52,7 +52,7 @@ class KafkaConsumerConnector(
      *
      * @param duration the maximum duration for the long poll
      */
-    override def peek(duration: Duration = 500 milliseconds) = {
+    override def peek(duration: Duration = 500.milliseconds) = {
         val records = consumer.poll(duration.toMillis)
         records map { r => (r.topic, r.partition, r.offset, r.value) }
     }
@@ -102,6 +102,7 @@ class KafkaConsumerConnector(
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval.toMillis.toString)
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString)
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (!readeos) "latest" else "earliest")
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)
 
         // This value controls the server-side wait time which affects polling latency.
         // A low value improves latency performance but it is important to not set it too low
@@ -116,7 +117,7 @@ class KafkaConsumerConnector(
         val keyDeserializer = new ByteArrayDeserializer
         val valueDeserializer = new ByteArrayDeserializer
         val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer)
-        topics map { consumer.subscribe(_) }
+        topics foreach { consumer.subscribe(_) }
         consumer
     }
 
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1ecfb20..58592e7 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -52,6 +52,7 @@ import whisk.core.connector.PingMessage
 import scala.util.Try
 import whisk.core.connector.MessageProducer
 import org.apache.kafka.common.errors.RecordTooLargeException
+import whisk.core.entity.TimeLimit
 
 /**
  * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke".
@@ -484,9 +485,8 @@ object Invoker {
         }
 
         val topic = s"invoker${invokerInstance.toInt}"
-        val groupid = "invokers"
         val maxdepth = ContainerPool.getDefaultMaxActive(config)
-        val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
+        val consumer = new KafkaConsumerConnector(config.kafkaHost, "invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
         val producer = new KafkaProducerConnector(config.kafkaHost, ec)
         val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem)
 
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 2a7fc4d..6e6ae7d 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -59,8 +59,9 @@ class KafkaConnectorTests
     val groupid = "kafkatest"
     val topic = "Dinosaurs"
     val sessionTimeout = 10 seconds
+    val maxPollInterval = 10 seconds
     val producer = new KafkaProducerConnector(config.kafkaHost, ec)
-    val consumer = new TestKafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout)
+    val consumer = new TestKafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval)
 
     override def afterAll() {
         producer.close()
@@ -108,7 +109,7 @@ class KafkaConnectorTests
             received.last should be(message.serialize)
 
             if (i < 2) {
-                Thread.sleep((sessionTimeout + 1.second).toMillis)
+                Thread.sleep((maxPollInterval + 1.second).toMillis)
                 a[CommitFailedException] should be thrownBy {
                     consumer.commit() // sleep should cause commit to fail
                 }
@@ -130,9 +131,9 @@ class KafkaConnectorTests
 
         // Send message while commit throws exception -> Message will not be processed
         consumer.commitFails = true
-        retry(stream.toString should include("failed to commit to kafka: commit failed"), 50, Some(100 millisecond))
+        retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond))
         Await.result(producer.send(topic, message), 10 seconds)
-        retry(stream.toString should include("failed to commit to kafka: commit failed"), 50, Some(100 millisecond))
+        retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond))
 
         // Send message again -> No commit exception -> Should work again
         consumer.commitFails = false
@@ -150,11 +151,13 @@ class TestKafkaConsumerConnector(
     kafkahost: String,
     groupid: String,
     topic: String,
-    sessionTimeout: FiniteDuration)(implicit logging: Logging) extends KafkaConsumerConnector(kafkahost, groupid, topic, sessionTimeout = sessionTimeout) {
+    sessionTimeout: FiniteDuration,
+    maxPollInterval: FiniteDuration)(implicit logging: Logging) extends KafkaConsumerConnector(
+    kafkahost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval) {
 
     override def commit() = {
         if (commitFails) {
-            throw new CommitFailedException("commit failed")
+            throw new CommitFailedException()
         } else {
             super.commit()
         }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].