You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/03/13 10:53:47 UTC

[GitHub] markusthoemmes closed pull request #3423: Measure kafka queue in consumer with better exactitude

markusthoemmes closed pull request #3423: Measure kafka queue in consumer with better exactitude
URL: https://github.com/apache/incubator-openwhisk/pull/3423
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 9470bd3c6e..402ae5d6cc 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -73,6 +73,7 @@ whisk {
             // A low value improves latency performance but it is important to not set it too low
             // as that will cause excessive busy-waiting.
             fetch-max-wait-ms = 20
+            metric-flush-interval-s = 60
         }
 
         topics {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 26a0f8635a..fc0954e316 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
@@ -33,7 +34,7 @@ import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.collection.JavaConverters._
 
-case class KafkaConsumerConfig(sessionTimeoutMs: Long)
+case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
 
 class KafkaConsumerConnector(
   kafkahost: String,
@@ -49,6 +50,10 @@ class KafkaConsumerConnector(
   // logic, like the wakeup timer.
   private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
 
+  // Currently consumed offset, is used to calculate the topic lag.
+  // It is updated from one thread in "peek", no concurrent data structure is necessary
+  private var offset: Long = 0
+
   /**
    * Long poll for messages. Method returns once message are available but no later than given
    * duration.
@@ -62,7 +67,11 @@ class KafkaConsumerConnector(
     val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
 
     try {
-      consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+      val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+      response.lastOption.foreach {
+        case (_, _, newOffset, _) => offset = newOffset + 1
+      }
+      response
     } catch {
       // Happens if the peek hangs.
       case _: WakeupException if retry > 0 =>
@@ -138,14 +147,13 @@ class KafkaConsumerConnector(
 
   @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
 
-//  Read current lag of the consumed topic, e.g. invoker queue and
-//  emit kamon histogram metric every 5 seconds
-//  Since we use only one partition in kafka, it is defined 0 in the metric name
-  actorSystem.scheduler.schedule(10.second, 5.second) {
-    val queueSize = consumer.metrics.asScala
-      .find(_._1.name() == s"$topic-0.records-lag")
-      .map(_._2.value().toInt)
-      .getOrElse(0)
-    MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+  //  Read current lag of the consumed topic, e.g. invoker queue
+  //  Since we use only one partition in kafka, it is defined 0
+  actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
+    val topicAndPartition = Set(new TopicPartition(topic, 0))
+    consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset =>
+      val queueSize = endOffset - offset
+      MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services