You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/03/13 10:53:48 UTC
[incubator-openwhisk] branch master updated: Measure kafka queue in
consumer with better exactitude. (#3423)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 24d10fb Measure kafka queue in consumer with better exactitude. (#3423)
24d10fb is described below
commit 24d10fb6825dbd6bdd8b7896b924143751c8b2e0
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Tue Mar 13 11:53:45 2018 +0100
Measure kafka queue in consumer with better exactitude. (#3423)
Measure kafka queue metric directly in the consumer by comparing the offsets instead of relying on the built-in consumer metrics. This method provides more precision compared to the old approach, it was observed that the built-in metrics show certain lagging in case of burst.
Additionally make kamon flush cadency configurable in the application.conf.
---
common/scala/src/main/resources/application.conf | 1 +
.../connector/kafka/KafkaConsumerConnector.scala | 30 ++++++++++++++--------
2 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 9470bd3..402ae5d 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -73,6 +73,7 @@ whisk {
// A low value improves latency performance but it is important to not set it too low
// as that will cause excessive busy-waiting.
fetch-max-wait-ms = 20
+ metric-flush-interval-s = 60
}
topics {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 26a0f86..fc0954e 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import akka.actor.ActorSystem
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{RetriableException, WakeupException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import pureconfig.loadConfigOrThrow
@@ -33,7 +34,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._
-case class KafkaConsumerConfig(sessionTimeoutMs: Long)
+case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
class KafkaConsumerConnector(
kafkahost: String,
@@ -49,6 +50,10 @@ class KafkaConsumerConnector(
// logic, like the wakeup timer.
private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
+ // Currently consumed offset, is used to calculate the topic lag.
+ // It is updated from one thread in "peek", no concurrent data structure is necessary
+ private var offset: Long = 0
+
/**
* Long poll for messages. Method returns once message are available but no later than given
* duration.
@@ -62,7 +67,11 @@ class KafkaConsumerConnector(
val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
try {
- consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+ val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+ response.lastOption.foreach {
+ case (_, _, newOffset, _) => offset = newOffset + 1
+ }
+ response
} catch {
// Happens if the peek hangs.
case _: WakeupException if retry > 0 =>
@@ -138,14 +147,13 @@ class KafkaConsumerConnector(
@volatile private var consumer = getConsumer(getProps, Some(List(topic)))
-// Read current lag of the consumed topic, e.g. invoker queue and
-// emit kamon histogram metric every 5 seconds
-// Since we use only one partition in kafka, it is defined 0 in the metric name
- actorSystem.scheduler.schedule(10.second, 5.second) {
- val queueSize = consumer.metrics.asScala
- .find(_._1.name() == s"$topic-0.records-lag")
- .map(_._2.value().toInt)
- .getOrElse(0)
- MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ // Read current lag of the consumed topic, e.g. invoker queue
+ // Since we use only one partition in kafka, it is defined 0
+ actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
+ val topicAndPartition = Set(new TopicPartition(topic, 0))
+ consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset =>
+ val queueSize = endOffset - offset
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ }
}
}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.