You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/03/14 12:05:57 UTC
[incubator-openwhisk] branch master updated: Fix negative values
and blocking usage in Kafka lag monitoring. (#3434)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 9c88922 Fix negative values and blocking usage in Kafka lag monitoring. (#3434)
9c88922 is described below
commit 9c889222f9706f7a7b40e3ccbca78aa03f2d6d2c
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Mar 14 13:05:54 2018 +0100
Fix negative values and blocking usage in Kafka lag monitoring. (#3434)
1. `endOffsets` might be eventually consistent to the locally stored offset value. Negative values need to be normalized to 0.
2. `endOffsets` can "block indefinitly" per documentation, so we need to make sure to protect the execution context against thread starvation.
---
.../connector/kafka/KafkaConsumerConnector.scala | 25 +++++++++++++---------
1 file changed, 15 insertions(+), 10 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index fc0954e..20d5635 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -25,14 +25,14 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{RetriableException, WakeupException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import pureconfig.loadConfigOrThrow
-import whisk.common.{Logging, LoggingMarkers, MetricEmitter}
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.concurrent.{blocking, ExecutionContext, Future}
case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
@@ -147,13 +147,18 @@ class KafkaConsumerConnector(
@volatile private var consumer = getConsumer(getProps, Some(List(topic)))
- // Read current lag of the consumed topic, e.g. invoker queue
- // Since we use only one partition in kafka, it is defined 0
- actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
- val topicAndPartition = Set(new TopicPartition(topic, 0))
- consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset =>
- val queueSize = endOffset - offset
- MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ // Read current lag of the consumed topic, e.g. invoker queue
+ // Since we use only one partition in kafka, it is defined 0
+ Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, "kafka-lag-monitor") { () =>
+ Future {
+ blocking {
+ val topicAndPartition = new TopicPartition(topic, 0)
+ consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach { endOffset =>
+ // endOffset could lag behind the offset reported by the consumer internally resulting in negative numbers
+ val queueSize = (endOffset - offset).max(0)
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ }
+ }
}
}
}
--
To stop receiving notification emails like this one, please contact
cbickel@apache.org.