You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/02/07 12:52:29 UTC

[nifi-minifi-cpp] 02/04: MINIFICPP-2016 Add session commit time metrics

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1181498b81f4c6ac18c19c9eb85773556a553e5e
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Feb 7 11:13:54 2023 +0100

    MINIFICPP-2016 Add session commit time metrics
    
    Closes #1477
    
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 METRICS.md                                         | 18 +++++-----
 .../cluster/checkers/PrometheusChecker.py          |  3 +-
 libminifi/include/core/ProcessorMetrics.h          |  5 +++
 libminifi/src/core/Processor.cpp                   |  8 +++--
 libminifi/src/core/ProcessorMetrics.cpp            | 20 +++++++++--
 libminifi/test/unit/MetricsTests.cpp               | 41 +++++++++++++++++++++-
 6 files changed, 81 insertions(+), 14 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index bf6ed91e4..f6c87dbaf 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -182,14 +182,16 @@ Regular expressions can also be used for requesting multiple processor metrics a
 
 There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
 
-| Metric name                            | Labels                                       | Description                                                                         |
-|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
-| onTrigger_invocations                  | metric_class, processor_name, processor_uuid | The number of processor onTrigger calls                                             |
-| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor |
-| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor             |
-| transferred_flow_files                 | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship                                  |
-| transferred_bytes                      | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship                                       |
-| transferred_to_\<relationship\>        | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship                         |
+| Metric name                                 | Labels                                       | Description                                                                              |
+|---------------------------------------------|----------------------------------------------|------------------------------------------------------------------------------------------|
+| onTrigger_invocations                       | metric_class, processor_name, processor_uuid | The number of processor onTrigger calls                                                  |
+| average_onTrigger_runtime_milliseconds      | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor      |
+| last_onTrigger_runtime_milliseconds         | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor                  |
+| average_session_commit_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 session commit calls of the processor |
+| last_session_commit_runtime_milliseconds    | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last session commit call of the processor             |
+| transferred_flow_files                      | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship                                       |
+| transferred_bytes                           | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship                                            |
+| transferred_to_\<relationship\>             | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship                              |
 
 | Label          | Description                                                            |
 |----------------|------------------------------------------------------------------------|
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index 8df4bf565..461e9a5e8 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -55,7 +55,8 @@ class PrometheusChecker:
 
     def verify_general_processor_metrics(self, metric_class, processor_name):
         labels = {'processor_name': processor_name}
-        return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+        return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds',
+                                          'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
             self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels)
 
     def verify_getfile_metrics(self, metric_class, processor_name):
diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h
index ae1aa90dc..63e1f7879 100644
--- a/libminifi/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -49,6 +49,10 @@ class ProcessorMetrics : public state::response::ResponseNode {
   std::chrono::milliseconds getLastOnTriggerRuntime() const;
   void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
 
+  std::chrono::milliseconds getAverageSessionCommitRuntime() const;
+  std::chrono::milliseconds getLastSessionCommitRuntime() const;
+  void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
+
   std::atomic<size_t> iterations{0};
   std::atomic<size_t> transferred_flow_files{0};
   std::atomic<uint64_t> transferred_bytes{0};
@@ -80,6 +84,7 @@ class ProcessorMetrics : public state::response::ResponseNode {
   std::unordered_map<std::string, size_t> transferred_relationships_;
   const Processor& source_processor_;
   Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
+  Averager<std::chrono::milliseconds> session_commit_runtime_averager_;
 };
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index bd4705675..ffc186a96 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -187,10 +187,12 @@ void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessio
 
   try {
     // Call the virtual trigger function
-    const auto start = std::chrono::steady_clock::now();
+    auto start = std::chrono::steady_clock::now();
     onTrigger(context, session.get());
     metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
+    start = std::chrono::steady_clock::now();
     session->commit();
+    metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)",
         exception.what(), typeid(exception).name(), getUUIDStr(), getName());
@@ -210,10 +212,12 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
 
   try {
     // Call the virtual trigger function
-    const auto start = std::chrono::steady_clock::now();
+    auto start = std::chrono::steady_clock::now();
     onTrigger(context, session);
     metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
+    start = std::chrono::steady_clock::now();
     session->commit();
+    metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
   } catch (std::exception &exception) {
     logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)",
         exception.what(), typeid(exception).name(), getUUIDStr(), getName());
diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp
index e243a0178..8cd054888 100644
--- a/libminifi/src/core/ProcessorMetrics.cpp
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -26,7 +26,8 @@ namespace org::apache::nifi::minifi::core {
 
 ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
     : source_processor_(source_processor),
-      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT),
+      session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
 }
 
 std::string ProcessorMetrics::getName() const {
@@ -46,6 +47,8 @@ std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize
       {.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())},
       {.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
       {.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+      {.name = "AverageSessionCommitRunTime", .value = static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
+      {.name = "LastSessionCommitRunTime", .value = static_cast<uint64_t>(getLastSessionCommitRuntime().count())},
       {.name = "TransferredFlowFiles", .value = static_cast<uint32_t>(transferred_flow_files.load())},
       {.name = "TransferredBytes", .value = transferred_bytes.load()}
     }
@@ -73,6 +76,8 @@ std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
     {"onTrigger_invocations", static_cast<double>(iterations.load()), getCommonLabels()},
     {"average_onTrigger_runtime_milliseconds", static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
     {"last_onTrigger_runtime_milliseconds", static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
+    {"average_session_commit_runtime_milliseconds", static_cast<double>(getAverageSessionCommitRuntime().count()), getCommonLabels()},
+    {"last_session_commit_runtime_milliseconds", static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()},
     {"transferred_flow_files", static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
     {"transferred_bytes", static_cast<double>(transferred_bytes.load()), getCommonLabels()}
   };
@@ -105,14 +110,25 @@ std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const {
   return on_trigger_runtime_averager_.getLastValue();
 }
 
+std::chrono::milliseconds ProcessorMetrics::getAverageSessionCommitRuntime() const {
+  return session_commit_runtime_averager_.getAverage();
+}
+
+void ProcessorMetrics::addLastSessionCommitRuntime(std::chrono::milliseconds runtime) {
+  session_commit_runtime_averager_.addValue(runtime);
+}
+
+std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const {
+  return session_commit_runtime_averager_.getLastValue();
+}
 
 template<typename ValueType>
 requires Summable<ValueType> && DividableByInteger<ValueType>
 ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
   if (values_.empty()) {
     return {};
   }
-  std::lock_guard<std::mutex> lock(average_value_mutex_);
   return ranges::accumulate(values_, ValueType{}) / values_.size();
 }
 
diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp
index 20aeb7c6e..a73cecae5 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -209,7 +209,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
   }
 }
 
-TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") {
   DummyProcessor dummy_processor("dummy");
   minifi::core::ProcessorMetrics metrics(dummy_processor);
 
@@ -248,4 +248,43 @@ TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
   REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms);
 }
 
+TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 0ms);
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 0ms);
+
+  metrics.addLastSessionCommitRuntime(10ms);
+  metrics.addLastSessionCommitRuntime(20ms);
+  metrics.addLastSessionCommitRuntime(30ms);
+
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 30ms);
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 20ms);
+
+  for (auto i = 0; i < 7; ++i) {
+    metrics.addLastSessionCommitRuntime(50ms);
+  }
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 41ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
+
+  for (auto i = 0; i < 3; ++i) {
+    metrics.addLastSessionCommitRuntime(50ms);
+  }
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 50ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastSessionCommitRuntime(40ms);
+  }
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 40ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 40ms);
+
+  metrics.addLastSessionCommitRuntime(10ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 10ms);
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
+}
+
 }  // namespace org::apache::nifi::minifi::test