You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/03/14 12:05:57 UTC

[incubator-openwhisk] branch master updated: Fix negative values and blocking usage in Kafka lag monitoring. (#3434)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c88922  Fix negative values and blocking usage in Kafka lag monitoring. (#3434)
9c88922 is described below

commit 9c889222f9706f7a7b40e3ccbca78aa03f2d6d2c
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Mar 14 13:05:54 2018 +0100

    Fix negative values and blocking usage in Kafka lag monitoring. (#3434)
    
    1. `endOffsets` might be eventually consistent to the locally stored offset value. Negative values need to be normalized to 0.
    2. `endOffsets` can "block indefinitly" per documentation, so we need to make sure to protect the execution context against thread starvation.
---
 .../connector/kafka/KafkaConsumerConnector.scala   | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index fc0954e..20d5635 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -25,14 +25,14 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
-import whisk.common.{Logging, LoggingMarkers, MetricEmitter}
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
 import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future}
 import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.concurrent.{blocking, ExecutionContext, Future}
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
 
@@ -147,13 +147,18 @@ class KafkaConsumerConnector(
 
   @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
 
-  //  Read current lag of the consumed topic, e.g. invoker queue
-  //  Since we use only one partition in kafka, it is defined 0
-  actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
-    val topicAndPartition = Set(new TopicPartition(topic, 0))
-    consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset =>
-      val queueSize = endOffset - offset
-      MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+  // Read current lag of the consumed topic, e.g. invoker queue
+  // Since we use only one partition in kafka, it is defined 0
+  Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, "kafka-lag-monitor") { () =>
+    Future {
+      blocking {
+        val topicAndPartition = new TopicPartition(topic, 0)
+        consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach { endOffset =>
+          // endOffset could lag behind the offset reported by the consumer internally resulting in negative numbers
+          val queueSize = (endOffset - offset).max(0)
+          MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+        }
+      }
     }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
cbickel@apache.org.