You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/02 15:24:01 UTC

[GitHub] dubee closed pull request #3934: Adjust Kafka's config to not contain any more warnings.

dubee closed pull request #3934: Adjust Kafka's config to not contain any more warnings.
URL: https://github.com/apache/incubator-openwhisk/pull/3934
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index b12c62ef1b..d9e557dafd 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -58,6 +58,10 @@ whisk {
     kafka {
         replication-factor = 1
 
+        // Used to control the cadence of the consumer lag check interval
+        consumer-lag-check-interval = 60 seconds
+
+        // The following settings are passed "raw" to the respective Kafka client. Dashes are replaced by dots.
         common {
             security-protocol = PLAINTEXT
             ssl-endpoint-identification-algorithm = "" // restores pre-kafka 2.0.0 default
@@ -81,7 +85,6 @@ whisk {
             // A low value improves latency performance but it is important to not set it too low
             // as that will cause excessive busy-waiting.
             fetch-max-wait-ms = 20
-            metric-flush-interval-s = 60
         }
 
         topics {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index bb05720fc0..bc88907528 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -34,7 +34,7 @@ import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future}
 import scala.util.Failure
 
-case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
+case class KafkaConsumerConfig(sessionTimeoutMs: Long)
 
 class KafkaConsumerConnector(
   kafkahost: String,
@@ -174,7 +174,10 @@ class KafkaConsumerConnector(
 
   // Read current lag of the consumed topic, e.g. invoker queue
   // Since we use only one partition in kafka, it is defined 0
-  Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, "kafka-lag-monitor") { () =>
+  Scheduler.scheduleWaitAtMost(
+    interval = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka).consumerLagCheckInterval,
+    initialDelay = 10.seconds,
+    name = "kafka-lag-monitor") { () =>
     Future {
       blocking {
         if (offset > 0) {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index b7373aa27c..ac652a3886 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -31,7 +31,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success, Try}
 
-case class KafkaConfig(replicationFactor: Short)
+case class KafkaConfig(replicationFactor: Short, consumerLagCheckInterval: FiniteDuration)
 
 /**
  * A Kafka based implementation of MessagingProvider


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services