You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/28 11:56:34 UTC

[incubator-openwhisk] branch master updated: Emit metrics for Kafka Consumer lag. (#3361)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 99396c7  Emit metrics for Kafka Consumer lag. (#3361)
99396c7 is described below

commit 99396c78006daf96f5b32f020aa262c37ca766b5
Author: Vadim Raskin <ra...@gmail.com>
AuthorDate: Wed Feb 28 12:56:32 2018 +0100

    Emit metrics for Kafka Consumer lag. (#3361)
---
 common/scala/src/main/scala/whisk/common/Logging.scala     |  3 +++
 .../whisk/connector/kafka/KafkaConsumerConnector.scala     | 14 +++++++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 37cc1e3..79ad731 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -265,6 +265,9 @@ object LoggingMarkers {
     LogMarkerToken(invoker, s"container_start_${containerState}", count)
   def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, s"kubectl.$cmd", start)
 
+  // Kafka related markers
+  def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
+
   /*
    * General markers
    */
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 51a27ad..26a0f86 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -24,13 +24,14 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
-import whisk.common.Logging
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter}
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
 import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.collection.JavaConverters._
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long)
 
@@ -136,4 +137,15 @@ class KafkaConsumerConnector(
   }
 
   @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
+
+//  Read current lag of the consumed topic, e.g. invoker queue and
+//  emit kamon histogram metric every 5 seconds
+//  Since we use only one partition in kafka, it is defined 0 in the metric name
+  actorSystem.scheduler.schedule(10.second, 5.second) {
+    val queueSize = consumer.metrics.asScala
+      .find(_._1.name() == s"$topic-0.records-lag")
+      .map(_._2.value().toInt)
+      .getOrElse(0)
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.