You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by sl...@apache.org on 2018/07/30 13:37:46 UTC

[incubator-openwhisk] branch master updated: Fix max.poll.interval.ms setting of KafkaConsumer. (#3912)

This is an automated email from the ASF dual-hosted git repository.

slange pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 19fda99  Fix max.poll.interval.ms setting of KafkaConsumer. (#3912)
19fda99 is described below

commit 19fda995857353f278c2c8eadc4958fae46b29ed
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Jul 30 15:37:43 2018 +0200

    Fix max.poll.interval.ms setting of KafkaConsumer. (#3912)
    
    * Fix max.poll.interval.ms setting of KafkaConsumer: the `max.poll.interval.ms` setting needs to be high enough to not be thrown off by all actions in the system consuming the maximum duration of their respective action timeout, ideally also adding enough slack to account for image pulls taking long, or docker runs being very slow.
    * The name of the setting was also misspelled so we've been running on the default. This change introduces printing a warning if the configuration contains unknown keys.
---
 common/scala/src/main/resources/application.conf     |  7 ++++++-
 .../connector/kafka/KafkaConsumerConnector.scala     |  2 ++
 .../connector/kafka/KafkaMessagingProvider.scala     | 20 ++++++++++++++++++++
 .../connector/kafka/KafkaProducerConnector.scala     |  3 +++
 .../test/scala/services/KafkaConnectorTests.scala    |  1 +
 5 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 86e5213..0c0e4e4 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -72,7 +72,12 @@ whisk {
             heartbeat-interval-ms = 10000
             enable-auto-commit = false
             auto-offset-reset = earliest
-            max-poll-interval = 360000
+
+            // request-timeout-ms always needs to be larger than max-poll-interval-ms per
+            // https://kafka.apache.org/documentation/#upgrade_1010_notable
+            max-poll-interval-ms = 1800000 // 30 minutes
+            request-timeout-ms = 1860000 // 31 minutes
+
             // This value controls the server-side wait time which affects polling latency.
             // A low value improves latency performance but it is important to not set it too low
             // as that will cause excessive busy-waiting.
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index eeec4a4..7111573 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -127,6 +127,8 @@ class KafkaConsumerConnector(
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
 
+    verifyConfig(config, ConsumerConfig.configNames().asScala.toSet)
+
     val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
     consumer.subscribe(Seq(topic).asJavaCollection)
     consumer
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 6843cbc..b7373aa 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -101,4 +101,24 @@ object KafkaConfiguration {
   def configMapToKafkaConfig(configMap: Map[String, String]): Map[String, String] = configMap.map {
     case (key, value) => configToKafkaKey(key) -> value
   }
+
+  /**
+   * Prints a warning for each unknown configuration item and returns false if at least one item is unknown.
+   *
+   * @param config the config to be checked
+   * @param validKeys known valid keys to configure
+   * @return true if all configuration keys are known, false if at least one is unknown
+   */
+  def verifyConfig(config: Map[String, String], validKeys: Set[String])(implicit logging: Logging): Boolean = {
+    val passedKeys = config.keySet
+    val knownKeys = validKeys intersect passedKeys
+    val unknownKeys = passedKeys -- knownKeys
+
+    if (unknownKeys.nonEmpty) {
+      logging.warn(this, s"potential misconfiguration, unknown settings: ${unknownKeys.mkString(",")}")
+      false
+    } else {
+      true
+    }
+  }
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index f82acaf..aea6b3c 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -29,6 +29,7 @@ import whisk.core.ConfigKeys
 import whisk.core.connector.{Message, MessageProducer}
 import whisk.core.entity.UUIDs
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
@@ -96,6 +97,8 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID()
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
 
+    verifyConfig(config, ProducerConfig.configNames().asScala.toSet)
+
     new KafkaProducer(config, new StringSerializer, new StringSerializer)
   }
 
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 57e2e56..297b1d4 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -55,6 +55,7 @@ class KafkaConnectorTests
   val groupid = "kafkatest"
   val topic = "KafkaConnectorTestTopic"
   val maxPollInterval = 10.seconds
+  System.setProperty("whisk.kafka.consumer.max-poll-interval-ms", maxPollInterval.toMillis.toString)
 
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than