You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/03/13 10:53:47 UTC
[GitHub] markusthoemmes closed pull request #3423: Measure kafka queue in consumer with better exactitude
markusthoemmes closed pull request #3423: Measure kafka queue in consumer with better exactitude
URL: https://github.com/apache/incubator-openwhisk/pull/3423
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 9470bd3c6e..402ae5d6cc 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -73,6 +73,7 @@ whisk {
// A low value improves latency performance but it is important to not set it too low
// as that will cause excessive busy-waiting.
fetch-max-wait-ms = 20
+ metric-flush-interval-s = 60
}
topics {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 26a0f8635a..fc0954e316 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import akka.actor.ActorSystem
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{RetriableException, WakeupException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import pureconfig.loadConfigOrThrow
@@ -33,7 +34,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._
-case class KafkaConsumerConfig(sessionTimeoutMs: Long)
+case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
class KafkaConsumerConnector(
kafkahost: String,
@@ -49,6 +50,10 @@ class KafkaConsumerConnector(
// logic, like the wakeup timer.
private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
+ // Currently consumed offset, is used to calculate the topic lag.
+ // It is updated from one thread in "peek", no concurrent data structure is necessary
+ private var offset: Long = 0
+
/**
* Long poll for messages. Method returns once message are available but no later than given
* duration.
@@ -62,7 +67,11 @@ class KafkaConsumerConnector(
val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
try {
- consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+ val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+ response.lastOption.foreach {
+ case (_, _, newOffset, _) => offset = newOffset + 1
+ }
+ response
} catch {
// Happens if the peek hangs.
case _: WakeupException if retry > 0 =>
@@ -138,14 +147,13 @@ class KafkaConsumerConnector(
@volatile private var consumer = getConsumer(getProps, Some(List(topic)))
-// Read current lag of the consumed topic, e.g. invoker queue and
-// emit kamon histogram metric every 5 seconds
-// Since we use only one partition in kafka, it is defined 0 in the metric name
- actorSystem.scheduler.schedule(10.second, 5.second) {
- val queueSize = consumer.metrics.asScala
- .find(_._1.name() == s"$topic-0.records-lag")
- .map(_._2.value().toInt)
- .getOrElse(0)
- MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ // Read current lag of the consumed topic, e.g. invoker queue
+ // Since we use only one partition in kafka, it is defined 0
+ actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
+ val topicAndPartition = Set(new TopicPartition(topic, 0))
+ consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset =>
+ val queueSize = endOffset - offset
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize)
+ }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services