You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/02/07 12:52:29 UTC
[nifi-minifi-cpp] 02/04: MINIFICPP-2016 Add session commit time metrics
This is an automated email from the ASF dual-hosted git repository.
martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1181498b81f4c6ac18c19c9eb85773556a553e5e
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Feb 7 11:13:54 2023 +0100
MINIFICPP-2016 Add session commit time metrics
Closes #1477
Signed-off-by: Martin Zink <ma...@apache.org>
---
METRICS.md | 18 +++++-----
.../cluster/checkers/PrometheusChecker.py | 3 +-
libminifi/include/core/ProcessorMetrics.h | 5 +++
libminifi/src/core/Processor.cpp | 8 +++--
libminifi/src/core/ProcessorMetrics.cpp | 20 +++++++++--
libminifi/test/unit/MetricsTests.cpp | 41 +++++++++++++++++++++-
6 files changed, 81 insertions(+), 14 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index bf6ed91e4..f6c87dbaf 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -182,14 +182,16 @@ Regular expressions can also be used for requesting multiple processor metrics a
There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
-| Metric name | Labels | Description |
-|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
-| onTrigger_invocations | metric_class, processor_name, processor_uuid | The number of processor onTrigger calls |
-| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor |
-| last_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor |
-| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship |
-| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship |
-| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship |
+| Metric name | Labels | Description |
+|---------------------------------------------|----------------------------------------------|------------------------------------------------------------------------------------------|
+| onTrigger_invocations | metric_class, processor_name, processor_uuid | The number of processor onTrigger calls |
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor |
+| last_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor |
+| average_session_commit_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 session commit calls of the processor |
+| last_session_commit_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last session commit call of the processor |
+| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship |
+| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship |
+| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship |
| Label | Description |
|----------------|------------------------------------------------------------------------|
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index 8df4bf565..461e9a5e8 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -55,7 +55,8 @@ class PrometheusChecker:
def verify_general_processor_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
- return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+ return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds',
+ 'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels)
def verify_getfile_metrics(self, metric_class, processor_name):
diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h
index ae1aa90dc..63e1f7879 100644
--- a/libminifi/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -49,6 +49,10 @@ class ProcessorMetrics : public state::response::ResponseNode {
std::chrono::milliseconds getLastOnTriggerRuntime() const;
void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+ std::chrono::milliseconds getAverageSessionCommitRuntime() const;
+ std::chrono::milliseconds getLastSessionCommitRuntime() const;
+ void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
+
std::atomic<size_t> iterations{0};
std::atomic<size_t> transferred_flow_files{0};
std::atomic<uint64_t> transferred_bytes{0};
@@ -80,6 +84,7 @@ class ProcessorMetrics : public state::response::ResponseNode {
std::unordered_map<std::string, size_t> transferred_relationships_;
const Processor& source_processor_;
Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
+ Averager<std::chrono::milliseconds> session_commit_runtime_averager_;
};
} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index bd4705675..ffc186a96 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -187,10 +187,12 @@ void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessio
try {
// Call the virtual trigger function
- const auto start = std::chrono::steady_clock::now();
+ auto start = std::chrono::steady_clock::now();
onTrigger(context, session.get());
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
+ start = std::chrono::steady_clock::now();
session->commit();
+ metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
} catch (const std::exception& exception) {
logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)",
exception.what(), typeid(exception).name(), getUUIDStr(), getName());
@@ -210,10 +212,12 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
try {
// Call the virtual trigger function
- const auto start = std::chrono::steady_clock::now();
+ auto start = std::chrono::steady_clock::now();
onTrigger(context, session);
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
+ start = std::chrono::steady_clock::now();
session->commit();
+ metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start));
} catch (std::exception &exception) {
logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)",
exception.what(), typeid(exception).name(), getUUIDStr(), getName());
diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp
index e243a0178..8cd054888 100644
--- a/libminifi/src/core/ProcessorMetrics.cpp
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -26,7 +26,8 @@ namespace org::apache::nifi::minifi::core {
ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
: source_processor_(source_processor),
- on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+ on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT),
+ session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
}
std::string ProcessorMetrics::getName() const {
@@ -46,6 +47,8 @@ std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize
{.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())},
{.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
{.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+ {.name = "AverageSessionCommitRunTime", .value = static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
+ {.name = "LastSessionCommitRunTime", .value = static_cast<uint64_t>(getLastSessionCommitRuntime().count())},
{.name = "TransferredFlowFiles", .value = static_cast<uint32_t>(transferred_flow_files.load())},
{.name = "TransferredBytes", .value = transferred_bytes.load()}
}
@@ -73,6 +76,8 @@ std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() {
{"onTrigger_invocations", static_cast<double>(iterations.load()), getCommonLabels()},
{"average_onTrigger_runtime_milliseconds", static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
{"last_onTrigger_runtime_milliseconds", static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
+ {"average_session_commit_runtime_milliseconds", static_cast<double>(getAverageSessionCommitRuntime().count()), getCommonLabels()},
+ {"last_session_commit_runtime_milliseconds", static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()},
{"transferred_flow_files", static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
{"transferred_bytes", static_cast<double>(transferred_bytes.load()), getCommonLabels()}
};
@@ -105,14 +110,25 @@ std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const {
return on_trigger_runtime_averager_.getLastValue();
}
+std::chrono::milliseconds ProcessorMetrics::getAverageSessionCommitRuntime() const {
+ return session_commit_runtime_averager_.getAverage();
+}
+
+void ProcessorMetrics::addLastSessionCommitRuntime(std::chrono::milliseconds runtime) {
+ session_commit_runtime_averager_.addValue(runtime);
+}
+
+std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const {
+ return session_commit_runtime_averager_.getLastValue();
+}
template<typename ValueType>
requires Summable<ValueType> && DividableByInteger<ValueType>
ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
+ std::lock_guard<std::mutex> lock(average_value_mutex_);
if (values_.empty()) {
return {};
}
- std::lock_guard<std::mutex> lock(average_value_mutex_);
return ranges::accumulate(values_, ValueType{}) / values_.size();
}
diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp
index 20aeb7c6e..a73cecae5 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -209,7 +209,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
}
}
-TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") {
DummyProcessor dummy_processor("dummy");
minifi::core::ProcessorMetrics metrics(dummy_processor);
@@ -248,4 +248,43 @@ TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms);
}
+TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
+ DummyProcessor dummy_processor("dummy");
+ minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+ REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+ REQUIRE(metrics.getLastSessionCommitRuntime() == 0ms);
+ REQUIRE(metrics.getAverageSessionCommitRuntime() == 0ms);
+
+ metrics.addLastSessionCommitRuntime(10ms);
+ metrics.addLastSessionCommitRuntime(20ms);
+ metrics.addLastSessionCommitRuntime(30ms);
+
+ REQUIRE(metrics.getLastSessionCommitRuntime() == 30ms);
+ REQUIRE(metrics.getAverageSessionCommitRuntime() == 20ms);
+
+ for (auto i = 0; i < 7; ++i) {
+ metrics.addLastSessionCommitRuntime(50ms);
+ }
+ REQUIRE(metrics.getAverageSessionCommitRuntime() == 41ms);
+ REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
+
+ for (auto i = 0; i < 3; ++i) {
+ metrics.addLastSessionCommitRuntime(50ms);
+ }
+ REQUIRE(metrics.getAverageSessionCommitRuntime() == 50ms);
+ REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
+
+ for (auto i = 0; i < 10; ++i) {
+ metrics.addLastSessionCommitRuntime(40ms);
+ }
+ REQUIRE(metrics.getAverageSessionCommitRuntime() == 40ms);
+ REQUIRE(metrics.getLastSessionCommitRuntime() == 40ms);
+
+ metrics.addLastSessionCommitRuntime(10ms);
+ REQUIRE(metrics.getLastSessionCommitRuntime() == 10ms);
+ REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
+}
+
} // namespace org::apache::nifi::minifi::test