You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/28 11:56:34 UTC
[incubator-openwhisk] branch master updated: Emit metrics for Kafka
Consumer lag. (#3361)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 99396c7 Emit metrics for Kafka Consumer lag. (#3361)
99396c7 is described below
commit 99396c78006daf96f5b32f020aa262c37ca766b5
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Wed Feb 28 12:56:32 2018 +0100
Emit metrics for Kafka Consumer lag. (#3361)
---
common/scala/src/main/scala/whisk/common/Logging.scala | 3 +++
.../whisk/connector/kafka/KafkaConsumerConnector.scala | 14 +++++++++++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 37cc1e3..79ad731 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -265,6 +265,9 @@ object LoggingMarkers {
LogMarkerToken(invoker, s"container_start_${containerState}", count)
def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, s"kubectl.$cmd", start)
+ // Kafka related markers
+ def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
+
/*
* General markers
*/
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 51a27ad..26a0f86 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -24,13 +24,14 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.errors.{RetriableException, WakeupException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import pureconfig.loadConfigOrThrow
-import whisk.common.Logging
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter}
import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
+import scala.collection.JavaConverters._
case class KafkaConsumerConfig(sessionTimeoutMs: Long)
@@ -136,4 +137,15 @@ class KafkaConsumerConnector(
}
@volatile private var consumer = getConsumer(getProps, Some(List(topic)))
+
+// Read current lag of the consumed topic, e.g. invoker queue and
+// emit kamon histogram metric every 5 seconds
+// Since we use only one partition in kafka, it is defined 0 in the metric name
+ actorSystem.scheduler.schedule(10.second, 5.second) {
+ val queueSize = consumer.metrics.asScala
+ .find(_._1.name() == s"$topic-0.records-lag")
+ .map(_._2.value().toInt)
+ .getOrElse(0)
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ }
}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.