You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/22 15:56:00 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request, #1400: MINIFICPP-1848 Create a generic solution for processor metrics

lordgamez opened a new pull request, #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400

   Generate common metrics for all MiNiFi processors to be available for publishing on C2 and Prometheus interfaces.
   
   https://issues.apache.org/jira/browse/MINIFICPP-1848
   
   -------------------------------------
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r971799510


##########
libminifi/test/unit/MetricsTests.cpp:
##########
@@ -203,3 +208,32 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     REQUIRE("0" == size.value);
   }
 }
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  metrics.addLastOnTriggerRuntime(20ms);
+  metrics.addLastOnTriggerRuntime(30ms);
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+}

Review Comment:
   we could add
   ```c++
     REQUIRE(metrics.getAverageOnTriggerRuntime() == 46ms);
   ```
   at the end, as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r993313911


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+    values_.reserve(SAMPLE_SIZE_);
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {
+    sum += value;
+  }
+  return sum / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {
+    if (next_average_index_ >= values_.size()) {
+      next_average_index_ = 0;
+    }
+    values_[next_average_index_] = runtime;
+    ++next_average_index_;
+  }
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getLastValue() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.empty()) {
+    return {};
+  } else if (values_.size() < SAMPLE_SIZE_) {
+    return values_[values_.size() - 1];
+  } else {
+    return values_[next_average_index_ - 1];

Review Comment:
   wouldn't this cause an underflow, if we query `getLastValue` right after we have pushed the last value at the end of the `values_` vector? (`values_.size() == SAMPLE_SIZE_ && next_average_index_ == 0`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r997095999


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+    values_.reserve(SAMPLE_SIZE_);
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {

Review Comment:
   Added `accumulate` usage in c164ede1b258f2d320e0313456650a253da5de5e



##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+    values_.reserve(SAMPLE_SIZE_);
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {
+    sum += value;
+  }
+  return sum / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {
+    if (next_average_index_ >= values_.size()) {
+      next_average_index_ = 0;
+    }
+    values_[next_average_index_] = runtime;
+    ++next_average_index_;
+  }
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getLastValue() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.empty()) {
+    return {};
+  } else if (values_.size() < SAMPLE_SIZE_) {
+    return values_[values_.size() - 1];
+  } else {
+    return values_[next_average_index_ - 1];

Review Comment:
   Good catch, fixed in c164ede1b258f2d320e0313456650a253da5de5e and added some tests for this case.



##########
libminifi/src/core/ProcessorMetrics.cpp:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/ProcessorMetrics.h"
+
+#include "core/Processor.h"
+#include "utils/gsl.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core {
+
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor),
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node {
+    .name = source_processor_.getUUIDStr(),
+    .children = {
+      {.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())},
+      {.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
+      {.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+      {.name = "TransferredFlowFiles", .value = static_cast<uint32_t>(transferred_flow_files.load())},
+      {.name = "TransferredBytes", .value = transferred_bytes.load()}
+    }
+  };
+
+  for (const auto& [relationship, count] : transferred_relationships_) {

Review Comment:
   Good point, added lock in c164ede1b258f2d320e0313456650a253da5de5e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r973074156


##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, "@")[1]; \
+  }
+#else
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto splitted = org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, "::"); \
+    return splitted[splitted.size() - 2]; \
+  }
+#endif

Review Comment:
   Good idea, updated in d6d350753ceb89fbcdf66e9b62d161417531b8d8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r973022579


##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, "@")[1]; \
+  }
+#else
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto splitted = org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, "::"); \
+    return splitted[splitted.size() - 2]; \
+  }
+#endif

Review Comment:
   We already have `minifi::core::getClassName()`, so we'd just need do `getClassName<decltype(*this)>`, split it at the `::`s and use the last bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r987983117


##########
extensions/standard-processors/processors/GetFile.cpp:
##########
@@ -248,8 +246,9 @@ bool GetFile::fileMatchesRequestCriteria(std::string fullName, std::string name,
     return false;
   }
 
-  metrics_->input_bytes_ += file_size;
-  metrics_->accepted_files_++;
+  auto getfile_metrics = static_cast<GetFileMetrics*>(metrics_.get());

Review Comment:
   Let's use `auto*` (or even better, `auto* const`, if you like using const pointers to mutable objects, like me), to make it clear that this is a pointer.
   ```suggestion
     auto* const getfile_metrics = static_cast<GetFileMetrics*>(metrics_.get());
   ```



##########
libminifi/src/core/ProcessSession.cpp:
##########
@@ -779,6 +779,11 @@ ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<
       }
     }
   }
+  if (metrics_) {
+    metrics_->transferred_bytes += record->getSize();
+    ++metrics_->transferred_flow_files;
+    metrics_->incrementRelationshipTransferCount(relationship.getName());
+  }

Review Comment:
   Consider doing this on commit instead, so that we only count actions that have actually happened.



##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {
+    sum += value;
+  }
+  return sum / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {

Review Comment:
   I suggest restructuring in a way that limits the number of allocations to at most one, and removes one of the branches.



##########
libminifi/src/core/ProcessorMetrics.cpp:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/ProcessorMetrics.h"
+
+#include "core/Processor.h"
+#include "utils/gsl.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core {
+
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor),
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node;
+  root_node.name = source_processor_.getUUIDStr();
+
+  state::response::SerializedResponseNode iter;

Review Comment:
   Prefer using a simpler, single initializer syntax wherever possible, like here: https://github.com/apache/nifi-minifi-cpp/blob/main/libminifi/include/core/state/nodes/AgentInformation.h#L619



##########
METRICS.md:
##########
@@ -103,34 +112,6 @@ RepositoryMetrics is a system level metric that reports metrics for the register
 |--------------------------|-----------------------------------------------------------------|
 | repository_name          | Name of the reported repository                                 |
 
-### GetFileMetrics
-
-Processor level metric that reports metrics for the GetFile processor if defined in the flow configuration
-
-| Metric name           | Labels                         | Description                                    |
-|-----------------------|--------------------------------|------------------------------------------------|
-| onTrigger_invocations | processor_name, processor_uuid | Number of times the processor was triggered    |

Review Comment:
   This onTrigger_invocations metric seems to be missing from the new structure. Is this intentional? Why?
   
   edit: It's still referenced in code, but not in the docs. Maybe we just need it back in the docs.



##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {

Review Comment:
   This class is too specific to be considered a general utility. I would rewrite it as a ring buffer, and make "average" a separate function somewhere else. It would also improve readability, because nobody knows at first glance what an "averager" is, but many people know ring buffers and know what the average of a collection means.
   
   If you don't want to rewrite it, please move it close to the usage, into an anonymous namespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r991343857


##########
METRICS.md:
##########
@@ -103,34 +112,6 @@ RepositoryMetrics is a system level metric that reports metrics for the register
 |--------------------------|-----------------------------------------------------------------|
 | repository_name          | Name of the reported repository                                 |
 
-### GetFileMetrics
-
-Processor level metric that reports metrics for the GetFile processor if defined in the flow configuration
-
-| Metric name           | Labels                         | Description                                    |
-|-----------------------|--------------------------------|------------------------------------------------|
-| onTrigger_invocations | processor_name, processor_uuid | Number of times the processor was triggered    |

Review Comment:
   Updated in 0da9263bee800563b495e40aba966a32fd1b1cb0



##########
extensions/standard-processors/processors/GetFile.cpp:
##########
@@ -248,8 +246,9 @@ bool GetFile::fileMatchesRequestCriteria(std::string fullName, std::string name,
     return false;
   }
 
-  metrics_->input_bytes_ += file_size;
-  metrics_->accepted_files_++;
+  auto getfile_metrics = static_cast<GetFileMetrics*>(metrics_.get());

Review Comment:
   Updated in 0da9263bee800563b495e40aba966a32fd1b1cb0



##########
libminifi/src/core/ProcessSession.cpp:
##########
@@ -779,6 +779,11 @@ ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<
       }
     }
   }
+  if (metrics_) {
+    metrics_->transferred_bytes += record->getSize();
+    ++metrics_->transferred_flow_files;
+    metrics_->incrementRelationshipTransferCount(relationship.getName());
+  }

Review Comment:
   Updated in 0da9263bee800563b495e40aba966a32fd1b1cb0



##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {

Review Comment:
   We should discuss it with @fgerlits too as it was his comment to extract it to a generic utility. I thought it was not that specific to this use case and could be reused later as it can calculate the average of any numerical or time values. I think the `Averager` together with the `sample_size` constructor argument should be descriptive enough to use it, but I'm open for discussing this on a team level.



##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {
+    sum += value;
+  }
+  return sum / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {

Review Comment:
   I added a `reserve` call in 0da9263bee800563b495e40aba966a32fd1b1cb0, but we cannot really initialize the members beforehand because we need to know the number of actual elements in the vector when calculating the average, which is retrieved with the `size()` member.



##########
libminifi/src/core/ProcessorMetrics.cpp:
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/ProcessorMetrics.h"
+
+#include "core/Processor.h"
+#include "utils/gsl.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core {
+
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor),
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node;
+  root_node.name = source_processor_.getUUIDStr();
+
+  state::response::SerializedResponseNode iter;

Review Comment:
   Updated in 0da9263bee800563b495e40aba966a32fd1b1cb0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r997094523


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {

Review Comment:
   Moved `Averager` closer to usage in 0bc6bda36d41dd7582bd0780220d2ff4303e50e8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r992061194


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {

Review Comment:
   Yes, let's discuss this in person.  I think separating `Averager` into a circular buffer and an averaging function which works on the buffer is a good further refactoring idea, but it is also fairly generic and reusable as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r993301197


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+    values_.reserve(SAMPLE_SIZE_);
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {

Review Comment:
   could `std::accumulate` work here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r993320857


##########
libminifi/src/core/ProcessorMetrics.cpp:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/ProcessorMetrics.h"
+
+#include "core/Processor.h"
+#include "utils/gsl.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core {
+
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor),
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node {
+    .name = source_processor_.getUUIDStr(),
+    .children = {
+      {.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())},
+      {.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
+      {.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+      {.name = "TransferredFlowFiles", .value = static_cast<uint32_t>(transferred_flow_files.load())},
+      {.name = "TransferredBytes", .value = transferred_bytes.load()}
+    }
+  };
+
+  for (const auto& [relationship, count] : transferred_relationships_) {

Review Comment:
   is it safe to access `transferred_relationships_` here without holding the `transferred_relationships_mutex_`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r1001738406


##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}]
+        for labels in label_list:
+            if not self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels):
+                return False
+        return True

Review Comment:
   very minor, but I think
   ```suggestion
           return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels) for labels in label_list))
   ```
   would be more readable
   
   (same in the `verify_metrics_exist` and `verify_metrics_larger_than_zero` functions below)



##########
libminifi/src/core/ProcessSession.cpp:
##########
@@ -740,7 +740,7 @@ void ProcessSession::restore(const std::string &key, const std::shared_ptr<core:
   flow->clearStashClaim(key);
 }
 
-ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<FlowFile> &record) {
+ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<FlowFile> &record, std::unordered_map<std::string, TransferMetrics>& transfers) {

Review Comment:
   I think it would be slightly nicer to make the second parameter a `std::function<void(const FlowFile&)> record_callback` and move the definition of the lambda to `commit()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r972927409


##########
METRICS.md:
##########
@@ -153,3 +132,40 @@ DeviceInfoNode is a system level metric that reports metrics about the system re
 | connection_name | Name of the connection defined in the flow configuration     |
 | component_uuid  | UUID of the component                                        |
 | component_name  | Name of the component                                        |
+
+## Processor Metrics
+
+Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+
+### General Metrics
+
+There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
+
+| Metric name                            | Labels                                       | Description                                                                         |
+|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor |
+| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor             |
+| transferred_flow_files                 | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship                                  |
+| transferred_bytes                      | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship                                       |
+| transferred_to_\<relationship\>        | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship                         |
+
+| Label          | Description                                                    |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetFileMetrics    |

Review Comment:
   Fixed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090")

Review Comment:
   It must have been removed by mistake, added in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090")
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}]
+        for labels in label_list:
+            if not self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels):
+                return False
+        return True
+
+    def verify_queue_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics')
+
+    def verify_general_processor_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+            self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels)
+
+    def verify_getfile_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_general_processor_metrics(metric_class, processor_name) and \
+            self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels)
+
+    def verify_flow_information_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \
+            self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'})
+
+    def verify_device_info_node_metrics(self):
+        return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
+
+    def verify_metric_exists(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        return len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)) > 0
+
+    def verify_metrics_exist(self, metric_names, metric_class, labels={}):
+        for metric_name in metric_names:
+            if not self.verify_metric_exists(metric_name, metric_class, labels):
+                return False
+        return True
+
+    def verify_metric_larger_than_zero(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        result = self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)
+        print(result)

Review Comment:
   Yes, good catch, removed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
extensions/standard-processors/processors/GetFile.h:
##########
@@ -48,70 +48,46 @@ struct GetFileRequest {
   std::string inputDirectory;
 };
 
-class GetFileMetrics : public state::response::ResponseNode {
+class GetFileMetrics : public core::ProcessorMetrics {
  public:
-  explicit GetFileMetrics(const CoreComponent& source_component)
-    : state::response::ResponseNode("GetFileMetrics"),
-      source_component_(source_component) {
-  }
-
-  std::string getName() const override {
-    return core::Connectable::getName();
+  explicit GetFileMetrics(const core::Processor& source_processor)
+    : core::ProcessorMetrics(source_processor) {
   }
 
   std::vector<state::response::SerializedResponseNode> serialize() override {
-    std::vector<state::response::SerializedResponseNode> resp;
-
-    state::response::SerializedResponseNode root_node;
-    root_node.name = source_component_.getUUIDStr();
-
-    state::response::SerializedResponseNode iter;
-    iter.name = "OnTriggerInvocations";
-    iter.value = (uint32_t)iterations_.load();
+    auto resp = core::ProcessorMetrics::serialize();
+    auto& root_node = resp[0];
 
-    root_node.children.push_back(iter);
+    state::response::SerializedResponseNode accepted_files_node;
+    accepted_files_node.name = "AcceptedFiles";
+    accepted_files_node.value = (uint32_t)accepted_files.load();
 
-    state::response::SerializedResponseNode accepted_files;
-    accepted_files.name = "AcceptedFiles";
-    accepted_files.value = (uint32_t)accepted_files_.load();
+    root_node.children.push_back(accepted_files_node);
 
-    root_node.children.push_back(accepted_files);
+    state::response::SerializedResponseNode input_bytes_node;
+    input_bytes_node.name = "InputBytes";
+    input_bytes_node.value = (uint32_t)input_bytes.load();

Review Comment:
   Good point, updated in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
extensions/standard-processors/tests/unit/ProcessorTests.cpp:
##########
@@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single threaded processor", "[is
   REQUIRE(LogTestController::getInstance().contains("[warning] Processor myProc can not be run in parallel, its "
                                                     "\"max concurrent tasks\" value is too high. It was set to 1 from 2."));
 }
+
+TEST_CASE("Test getProcessorName", "[getProcessorName]") {

Review Comment:
   Fixed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/ProcessSession.h:
##########
@@ -37,6 +37,7 @@
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
 #include "utils/gsl.h"
+#include "Processor.h"

Review Comment:
   Good idea, moved in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32

Review Comment:
   Fixed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, "@")[1]; \
+  }
+#else
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto splitted = org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, "::"); \
+    return splitted[splitted.size() - 2]; \
+  }
+#endif

Review Comment:
   I was trying to use that previously, but seemed more complicated and did not really have an good option for it on Windows. Boost's demangling function would be the best option for this, but wouldn't want to introduce that as a strict dependency. I think this version is simpler for our use case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r970642085


##########
METRICS.md:
##########
@@ -153,3 +132,40 @@ DeviceInfoNode is a system level metric that reports metrics about the system re
 | connection_name | Name of the connection defined in the flow configuration     |
 | component_uuid  | UUID of the component                                        |
 | component_name  | Name of the component                                        |
+
+## Processor Metrics
+
+Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+
+### General Metrics
+
+There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
+
+| Metric name                            | Labels                                       | Description                                                                         |
+|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor |
+| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor             |
+| transferred_flow_files                 | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship                                  |
+| transferred_bytes                      | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship                                       |
+| transferred_to_\<relationship\>        | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship                         |
+
+| Label          | Description                                                    |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetFileMetrics    |

Review Comment:
   I think this should be `<processor type>Metrics` instead of `GetFileMetrics`



##########
extensions/standard-processors/processors/GetFile.h:
##########
@@ -48,70 +48,46 @@ struct GetFileRequest {
   std::string inputDirectory;
 };
 
-class GetFileMetrics : public state::response::ResponseNode {
+class GetFileMetrics : public core::ProcessorMetrics {
  public:
-  explicit GetFileMetrics(const CoreComponent& source_component)
-    : state::response::ResponseNode("GetFileMetrics"),
-      source_component_(source_component) {
-  }
-
-  std::string getName() const override {
-    return core::Connectable::getName();
+  explicit GetFileMetrics(const core::Processor& source_processor)
+    : core::ProcessorMetrics(source_processor) {
   }
 
   std::vector<state::response::SerializedResponseNode> serialize() override {
-    std::vector<state::response::SerializedResponseNode> resp;
-
-    state::response::SerializedResponseNode root_node;
-    root_node.name = source_component_.getUUIDStr();
-
-    state::response::SerializedResponseNode iter;
-    iter.name = "OnTriggerInvocations";
-    iter.value = (uint32_t)iterations_.load();
+    auto resp = core::ProcessorMetrics::serialize();
+    auto& root_node = resp[0];
 
-    root_node.children.push_back(iter);
+    state::response::SerializedResponseNode accepted_files_node;
+    accepted_files_node.name = "AcceptedFiles";
+    accepted_files_node.value = (uint32_t)accepted_files.load();
 
-    state::response::SerializedResponseNode accepted_files;
-    accepted_files.name = "AcceptedFiles";
-    accepted_files.value = (uint32_t)accepted_files_.load();
+    root_node.children.push_back(accepted_files_node);
 
-    root_node.children.push_back(accepted_files);
+    state::response::SerializedResponseNode input_bytes_node;
+    input_bytes_node.name = "InputBytes";
+    input_bytes_node.value = (uint32_t)input_bytes.load();

Review Comment:
   I don't think we can assume that `input_bytes` is less than 4 GB.  Can we change the type of this ValueNode to `uint64_t`?
   
   Also, I would change the types of `accepted_files` and `input_bytes` to what we want them to be (probably `uint32_t` and `uint64_t`) instead of making them `size_t` and then casting.



##########
extensions/standard-processors/tests/unit/ProcessorTests.cpp:
##########
@@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single threaded processor", "[is
   REQUIRE(LogTestController::getInstance().contains("[warning] Processor myProc can not be run in parallel, its "
                                                     "\"max concurrent tasks\" value is too high. It was set to 1 from 2."));
 }
+
+TEST_CASE("Test getProcessorName", "[getProcessorName]") {

Review Comment:
   this looks like a typo:
   ```suggestion
   TEST_CASE("Test getProcessorType", "[getProcessorType]") {
   ```



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32

Review Comment:
   I'm not sure this doesn't work, but we use `#ifdef WIN32` everywhere else



##########
libminifi/include/core/Processor.h:
##########
@@ -62,10 +79,41 @@ constexpr std::chrono::nanoseconds MINIMUM_SCHEDULING_NANOS{30000};
 
 #define BUILDING_DLL 1
 
-class Processor : public Connectable, public ConfigurableComponent {
+class Processor;
+
+class ProcessorMetrics : public state::response::ResponseNode {
+ public:
+  explicit ProcessorMetrics(const Processor& source_processor);
+
+  [[nodiscard]] std::string getName() const override;
+
+  std::vector<state::response::SerializedResponseNode> serialize() override;
+  std::vector<state::PublishedMetric> calculateMetrics() override;
+  void incrementRelationshipTransferCount(const std::string& relationship);
+  std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+  std::chrono::milliseconds getLastOnTriggerRuntime() const;
+  void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+  std::atomic<size_t> iterations{0};
+  std::atomic<size_t> transferred_flow_files{0};
+  std::atomic<uint64_t> transferred_bytes{0};
+
+ protected:
+  [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const;
+  static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+
+  std::mutex relationship_mutex_;

Review Comment:
   I would rename the mutex to make it clearer what it guards:
   ```suggestion
     std::mutex transferred_relationships_mutex_;
   ```



##########
libminifi/test/unit/ProcessSessionTests.cpp:
##########
@@ -28,22 +28,6 @@
 
 namespace {
 
-class DummyProcessor : public minifi::core::Processor {
-  using minifi::core::Processor::Processor;
-
- public:
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);

Review Comment:
   This is a good idea.  There are two more identical copies of this dummy processor, in `ContentRepositoryDependentTests.h` and `GCPCredentialsControllerServiceTests.cpp`; can you get rid of them, too, please?



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090")

Review Comment:
   There used to be a `disable_ssl=True` here; are we connecting using SSL now?



##########
libminifi/include/core/ProcessSession.h:
##########
@@ -37,6 +37,7 @@
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
 #include "utils/gsl.h"
+#include "Processor.h"

Review Comment:
   I would move `ProcessorMetrics` to its own files (both `.h` and `.cpp`) and only include `ProcessorMetrics.h` instead of all of `Processor.h`.



##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);

Review Comment:
   `on_trigger_runtimes_`, together with `next_average_index_`, the mutex, and the functions using them, look like they could be packaged in a useful and reusable `Averager` class.



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090")
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}]
+        for labels in label_list:
+            if not self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels):
+                return False
+        return True
+
+    def verify_queue_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics')
+
+    def verify_general_processor_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+            self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels)
+
+    def verify_getfile_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_general_processor_metrics(metric_class, processor_name) and \
+            self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels)
+
+    def verify_flow_information_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \
+            self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'})
+
+    def verify_device_info_node_metrics(self):
+        return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
+
+    def verify_metric_exists(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        return len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)) > 0
+
+    def verify_metrics_exist(self, metric_names, metric_class, labels={}):
+        for metric_name in metric_names:
+            if not self.verify_metric_exists(metric_name, metric_class, labels):
+                return False
+        return True
+
+    def verify_metric_larger_than_zero(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        result = self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)
+        print(result)

Review Comment:
   is this a debug log left here by accident?



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, "@")[1]; \
+  }
+#else
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto splitted = org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, "::"); \
+    return splitted[splitted.size() - 2]; \
+  }
+#endif

Review Comment:
   I think using `typeid(*this).name()` would be better.  We would still need compiler-specific code, but it could be hidden in a `utils::demangleClassName()` function.



##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node;
+  root_node.name = source_processor_.getUUIDStr();
+
+  state::response::SerializedResponseNode iter;
+  iter.name = "OnTriggerInvocations";
+  iter.value = static_cast<uint32_t>(iterations.load());
+
+  root_node.children.push_back(iter);
+
+  state::response::SerializedResponseNode average_ontrigger_runtime_node;
+  average_ontrigger_runtime_node.name = "AverageOnTriggerRunTime";
+  average_ontrigger_runtime_node.value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count());
+
+  root_node.children.push_back(average_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode last_ontrigger_runtime_node;
+  last_ontrigger_runtime_node.name = "LastOnTriggerRunTime";
+  last_ontrigger_runtime_node.value = static_cast<uint64_t>(getLastOnTriggerRuntime().count());
+
+  root_node.children.push_back(last_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode transferred_flow_files_node;
+  transferred_flow_files_node.name = "TransferredFlowFiles";
+  transferred_flow_files_node.value = static_cast<uint32_t>(transferred_flow_files.load());
+
+  root_node.children.push_back(transferred_flow_files_node);
+
+  for (const auto& [relationship, count] : transferred_relationships_) {
+    state::response::SerializedResponseNode transferred_to_relationship_node;
+    transferred_to_relationship_node.name = std::string("TransferredTo").append(1, toupper(relationship[0])).append(relationship.substr(1));
+    transferred_to_relationship_node.value = static_cast<uint32_t>(count);
+
+    root_node.children.push_back(transferred_to_relationship_node);
+  }

Review Comment:
   This may be overly paranoid, but I would add a `gsl_Expects(relationship.size() > 0)` before line 89.



##########
libminifi/test/unit/MetricsTests.cpp:
##########
@@ -203,3 +208,32 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     REQUIRE("0" == size.value);
   }
 }
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  metrics.addLastOnTriggerRuntime(20ms);
+  metrics.addLastOnTriggerRuntime(30ms);
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+}

Review Comment:
   we could add
   ```c++
     REQUIRE(metrics.getLastOnTriggerRuntime() == 46ms);
   ```
   at the end, as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r972927586


##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);

Review Comment:
   Good idea, added `Averager.h` as a utility in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/include/core/Processor.h:
##########
@@ -62,10 +79,41 @@ constexpr std::chrono::nanoseconds MINIMUM_SCHEDULING_NANOS{30000};
 
 #define BUILDING_DLL 1
 
-class Processor : public Connectable, public ConfigurableComponent {
+class Processor;
+
+class ProcessorMetrics : public state::response::ResponseNode {
+ public:
+  explicit ProcessorMetrics(const Processor& source_processor);
+
+  [[nodiscard]] std::string getName() const override;
+
+  std::vector<state::response::SerializedResponseNode> serialize() override;
+  std::vector<state::PublishedMetric> calculateMetrics() override;
+  void incrementRelationshipTransferCount(const std::string& relationship);
+  std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+  std::chrono::milliseconds getLastOnTriggerRuntime() const;
+  void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+  std::atomic<size_t> iterations{0};
+  std::atomic<size_t> transferred_flow_files{0};
+  std::atomic<uint64_t> transferred_bytes{0};
+
+ protected:
+  [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const;
+  static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+
+  std::mutex relationship_mutex_;

Review Comment:
   Renamed in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/test/unit/ProcessSessionTests.cpp:
##########
@@ -28,22 +28,6 @@
 
 namespace {
 
-class DummyProcessor : public minifi::core::Processor {
-  using minifi::core::Processor::Processor;
-
- public:
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);

Review Comment:
   Updated in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/test/unit/MetricsTests.cpp:
##########
@@ -203,3 +208,32 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     REQUIRE("0" == size.value);
   }
 }
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  metrics.addLastOnTriggerRuntime(20ms);
+  metrics.addLastOnTriggerRuntime(30ms);
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+}

Review Comment:
   Added average check in 6da42b94abc78aaa874eaba0258bf07c49d21660



##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node;
+  root_node.name = source_processor_.getUUIDStr();
+
+  state::response::SerializedResponseNode iter;
+  iter.name = "OnTriggerInvocations";
+  iter.value = static_cast<uint32_t>(iterations.load());
+
+  root_node.children.push_back(iter);
+
+  state::response::SerializedResponseNode average_ontrigger_runtime_node;
+  average_ontrigger_runtime_node.name = "AverageOnTriggerRunTime";
+  average_ontrigger_runtime_node.value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count());
+
+  root_node.children.push_back(average_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode last_ontrigger_runtime_node;
+  last_ontrigger_runtime_node.name = "LastOnTriggerRunTime";
+  last_ontrigger_runtime_node.value = static_cast<uint64_t>(getLastOnTriggerRuntime().count());
+
+  root_node.children.push_back(last_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode transferred_flow_files_node;
+  transferred_flow_files_node.name = "TransferredFlowFiles";
+  transferred_flow_files_node.value = static_cast<uint32_t>(transferred_flow_files.load());
+
+  root_node.children.push_back(transferred_flow_files_node);
+
+  for (const auto& [relationship, count] : transferred_relationships_) {
+    state::response::SerializedResponseNode transferred_to_relationship_node;
+    transferred_to_relationship_node.name = std::string("TransferredTo").append(1, toupper(relationship[0])).append(relationship.substr(1));
+    transferred_to_relationship_node.value = static_cast<uint32_t>(count);
+
+    root_node.children.push_back(transferred_to_relationship_node);
+  }

Review Comment:
   Added check in 6da42b94abc78aaa874eaba0258bf07c49d21660



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r973026879


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)

Review Comment:
   We also need to be able to divide a `T` by an integer.  I think using something like `std::is_arithmetic_v` would be OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r993313911


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+    values_.reserve(SAMPLE_SIZE_);
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {
+    sum += value;
+  }
+  return sum / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {
+    if (next_average_index_ >= values_.size()) {
+      next_average_index_ = 0;
+    }
+    values_[next_average_index_] = runtime;
+    ++next_average_index_;
+  }
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getLastValue() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.empty()) {
+    return {};
+  } else if (values_.size() < SAMPLE_SIZE_) {
+    return values_[values_.size() - 1];
+  } else {
+    return values_[next_average_index_ - 1];

Review Comment:
   wouldn't this cause an ~underflow~ out-of-bounds indexing, if we query `getLastValue` right after we have pushed the last value at the end of the `values_` vector? (`values_.size() == SAMPLE_SIZE_ && next_average_index_ == 0`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r993315426


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {
+ public:
+  explicit Averager(uint32_t sample_size) : SAMPLE_SIZE_(sample_size) {
+    values_.reserve(SAMPLE_SIZE_);
+  }
+
+  ValueType getAverage() const;
+  ValueType getLastValue() const;
+  void addValue(ValueType runtime);
+
+ private:
+  const uint32_t SAMPLE_SIZE_;
+  mutable std::mutex average_value_mutex_;
+  uint32_t next_average_index_ = 0;
+  std::vector<ValueType> values_;
+};
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getAverage() const {
+  if (values_.empty()) {
+    return {};
+  }
+  ValueType sum{};
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  for (const auto& value : values_) {
+    sum += value;
+  }
+  return sum / values_.size();
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+void Averager<ValueType>::addValue(ValueType runtime) {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.size() < SAMPLE_SIZE_) {
+    values_.push_back(runtime);
+  } else {
+    if (next_average_index_ >= values_.size()) {
+      next_average_index_ = 0;
+    }
+    values_[next_average_index_] = runtime;
+    ++next_average_index_;
+  }
+}
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+ValueType Averager<ValueType>::getLastValue() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
+  if (values_.empty()) {
+    return {};
+  } else if (values_.size() < SAMPLE_SIZE_) {
+    return values_[values_.size() - 1];
+  } else {
+    return values_[next_average_index_ - 1];

Review Comment:
   initializing `next_average_index_` to `SAMPLE_SIZE_` could solve this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#issuecomment-1282249188

   Auto terminated relationships were not considered in the transfer metrics. Added change 08d82ed85efdb7589db7b2b0837528d66bb5c370 to fix this issue, which changes the calculation of the transfer metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r973026879


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)

Review Comment:
   We also need to be able to divide a `T` by an integer.  I think using something like `std::is_arithmetic_v` would be OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r992061194


##########
libminifi/include/utils/Averager.h:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <mutex>
+
+namespace org::apache::nifi::minifi::utils {
+
+template<typename T>
+concept Summable = requires(T x) { x + x; };  // NOLINT(readability/braces)
+
+template<typename T>
+concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; };  // NOLINT(readability/braces)
+
+template<typename ValueType>
+requires Summable<ValueType> && DividableByInteger<ValueType>
+class Averager {

Review Comment:
   Yes, let's discuss this in person.  I think separating `Averager` further into a circular buffer and an averaging function which works on the buffer is a good further refactoring idea, but it is also fairly generic and reusable as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1400: MINIFICPP-1848 Create a generic solution for processor metrics

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r1011672106


##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}]
+        for labels in label_list:
+            if not self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels):
+                return False
+        return True

Review Comment:
   Updated in 5031ed6685c20f0d79617c32943b70bda2755eb0



##########
libminifi/src/core/ProcessSession.cpp:
##########
@@ -740,7 +740,7 @@ void ProcessSession::restore(const std::string &key, const std::shared_ptr<core:
   flow->clearStashClaim(key);
 }
 
-ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<FlowFile> &record) {
+ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<FlowFile> &record, std::unordered_map<std::string, TransferMetrics>& transfers) {

Review Comment:
   Updated in 5031ed6685c20f0d79617c32943b70bda2755eb0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org