You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/11/28 10:11:20 UTC

[incubator-druid] branch master updated: emit maxLag/avgLag in KafkaSupervisor (#6587)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c5405bb  emit maxLag/avgLag in KafkaSupervisor (#6587)
c5405bb is described below

commit c5405bb5929b117b31ad0f4c4ba8c6faf38beb63
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Wed Nov 28 18:11:14 2018 +0800

    emit maxLag/avgLag in KafkaSupervisor (#6587)
    
    * emit maxLag/totalLag/avgLag in KafkaSupervisor
    
    * modify ingest/kafka/totalLag to ingest/kafka/lag for backwards compatibility
---
 docs/content/operations/metrics.md                 |  2 ++
 .../src/main/resources/defaultMetrics.json         |  6 ++++++
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 22 ++++++++++++++++------
 3 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md
index e327023..d99d3e5 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -149,6 +149,8 @@ emission period.|
 |`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
 |`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
 |`ingest/kafka/lag`|Applicable for Kafka Indexing Service. Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
+|`ingest/kafka/maxLag`|Applicable for Kafka Indexing Service. Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
+|`ingest/kafka/avgLag`|Applicable for Kafka Indexing Service. Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
 
 
 Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0. 
diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
index 553e3a2..e28639e 100644
--- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
@@ -99,6 +99,12 @@
   "ingest/kafka/lag": [
     "dataSource"
   ],
+  "ingest/kafka/maxLag": [
+    "dataSource"
+  ],
+  "ingest/kafka/avgLag": [
+    "dataSource"
+  ],
   "task/run/time": [
     "dataSource"
   ],
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 73fd54c..6992b8d 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -2349,14 +2349,24 @@ public class KafkaSupervisor implements Supervisor
           );
         }
 
-        long lag = getLagPerPartition(highestCurrentOffsets)
-            .values()
-            .stream()
-            .mapToLong(x -> Math.max(x, 0))
-            .sum();
+        Map<Integer, Long> partitionLags = getLagPerPartition(highestCurrentOffsets);
+        long maxLag = 0, totalLag = 0, avgLag;
+        for (long lag : partitionLags.values()) {
+          if (lag > maxLag) {
+            maxLag = lag;
+          }
+          totalLag += lag;
+        }
+        avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
 
         emitter.emit(
-            ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag)
+            ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag)
+        );
+        emitter.emit(
+            ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag)
+        );
+        emitter.emit(
+            ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag)
         );
       }
       catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org