You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/28 11:56:33 UTC

[GitHub] markusthoemmes closed pull request #3361: Monitor kafka queue for all consumers

markusthoemmes closed pull request #3361: Monitor kafka queue for all consumers
URL: https://github.com/apache/incubator-openwhisk/pull/3361
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 37cc1e3cc6..79ad731091 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -265,6 +265,9 @@ object LoggingMarkers {
     LogMarkerToken(invoker, s"container_start_${containerState}", count)
   def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, s"kubectl.$cmd", start)
 
+  // Kafka related markers
+  def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
+
   /*
    * General markers
    */
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 51a27adf41..26a0f8635a 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -24,13 +24,14 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
-import whisk.common.Logging
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter}
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
 import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.collection.JavaConverters._
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long)
 
@@ -136,4 +137,15 @@ class KafkaConsumerConnector(
   }
 
   @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
+
+//  Read current lag of the consumed topic, e.g. invoker queue and
+//  emit kamon histogram metric every 5 seconds
+//  Since we use only one partition in kafka, it is defined 0 in the metric name
+  actorSystem.scheduler.schedule(10.second, 5.second) {
+    val queueSize = consumer.metrics.asScala
+      .find(_._1.name() == s"$topic-0.records-lag")
+      .map(_._2.value().toInt)
+      .getOrElse(0)
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services