You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/17 22:59:33 UTC

[hudi] branch master updated: [HUDI-4873] Report number of messages to be processed via metrics (#6271)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cbd1d83ba [HUDI-4873] Report number of messages to be processed via metrics (#6271)
5cbd1d83ba is described below

commit 5cbd1d83bacda48a07808c10638375b0263c2bfd
Author: Volodymyr Burenin <vb...@gmail.com>
AuthorDate: Sat Sep 17 17:59:25 2022 -0500

    [HUDI-4873] Report number of messages to be processed via metrics (#6271)
    
    Co-authored-by: Volodymyr Burenin <vo...@cloudkitchens.com>
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java    | 6 ++++++
 .../main/java/org/apache/hudi/utilities/sources/KafkaSource.java    | 2 ++
 2 files changed, 8 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index d361179a1d..2475e92f96 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -101,6 +101,12 @@ public class HoodieDeltaStreamerMetrics implements Serializable {
     }
   }
 
+  public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) {
+    if (config.isMetricsOn()) {
+      Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
+    }
+  }
+
   public long getDurationInMs(long ctxDuration) {
     return ctxDuration / 1000000;
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 6f2377fc7c..5561356cab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -57,8 +57,10 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
       long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
       LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
       if (totalNewMsgs <= 0) {
+        metrics.updateDeltaStreamerKafkaMessageInCount(0);
         return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
       }
+      metrics.updateDeltaStreamerKafkaMessageInCount(totalNewMsgs);
       JavaRDD<T> newDataRDD = toRDD(offsetRanges);
       return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
     } catch (org.apache.kafka.common.errors.TimeoutException e) {