You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/03/22 16:22:42 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1770 Add C2 NO_OPERATION response if property update is NoOp

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4000ded  MINIFICPP-1770 Add C2 NO_OPERATION response if property update is NoOp
4000ded is described below

commit 4000dedab86fe171fbeb09a14d4f2099e3011cda
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Mar 22 17:21:04 2022 +0100

    MINIFICPP-1770 Add C2 NO_OPERATION response if property update is NoOp
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1280
---
 extensions/coap/protocols/CoapC2Protocol.cpp       |  1 +
 .../http-curl/tests/C2PropertiesUpdateTests.cpp    |  3 +-
 extensions/http-curl/tests/HTTPHandlers.h          | 22 ++++++--
 libminifi/include/c2/C2Agent.h                     |  9 +++-
 libminifi/include/core/state/UpdateController.h    | 46 +---------------
 libminifi/src/Configure.cpp                        | 15 ++++--
 libminifi/src/c2/C2Agent.cpp                       | 63 +++++++++++++++-------
 libminifi/src/c2/HeartbeatJsonSerializer.cpp       |  2 +
 8 files changed, 88 insertions(+), 73 deletions(-)

diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp b/extensions/coap/protocols/CoapC2Protocol.cpp
index 1d1f396..04e6df9 100644
--- a/extensions/coap/protocols/CoapC2Protocol.cpp
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -65,6 +65,7 @@ int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2:
     case state::UpdateState::INITIATE:
     case state::UpdateState::FULLY_APPLIED:
     case state::UpdateState::READ_COMPLETE:
+    case state::UpdateState::NO_OPERATION:
       payloadState = 0;
       break;
     case state::UpdateState::NOT_APPLIED:
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index 36812d0..e802695 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -43,7 +43,6 @@ class C2HeartbeatHandler : public ServerAwareHandler {
                       "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
                 response_->length());
       mg_printf(conn, "%s", response_->c_str());
-      response_.reset();
     } else {
       mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
                       "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -150,6 +149,8 @@ int main() {
 
   VerifyPropertyUpdate harness([&] {
     assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");}));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("FULLY_APPLIED") == 1;}));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("NO_OPERATION") > 0;}));
     // update operation acknowledged
     {
       // verify final log levels
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 6f9e193..25ebbc0 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -792,20 +792,36 @@ class C2AcknowledgeHandler : public ServerAwareHandler {
     rapidjson::Document root;
     root.Parse(req.data(), req.size());
     if (root.IsObject() && root.HasMember("operationId")) {
-      std::lock_guard<std::mutex> guard(mtx_);
+      std::lock_guard<std::mutex> guard(ack_operations_mtx_);
       acknowledged_operations_.insert(root["operationId"].GetString());
     }
+
+    if (root.IsObject() && root.HasMember("operationState")) {
+      if (root["operationState"].IsObject() && root["operationState"].HasMember("state")) {
+        std::lock_guard<std::mutex> guard(apply_count_mtx_);
+        auto result_state = root["operationState"]["state"].GetString();
+        ++apply_count_[result_state];
+      }
+    }
+
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
                     "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
     return true;
   }
 
   bool isAcknowledged(const std::string& operation_id) const {
-    std::lock_guard<std::mutex> guard(mtx_);
+    std::lock_guard<std::mutex> guard(ack_operations_mtx_);
     return acknowledged_operations_.count(operation_id) > 0;
   }
 
+  uint32_t getApplyCount(const std::string& result_state) const {
+    std::lock_guard<std::mutex> guard(apply_count_mtx_);
+    return apply_count_.find(result_state) != apply_count_.end() ? apply_count_.at(result_state) : 0;
+  }
+
  private:
-  mutable std::mutex mtx_;
+  mutable std::mutex ack_operations_mtx_;
+  mutable std::mutex apply_count_mtx_;
   std::set<std::string> acknowledged_operations_;
+  std::unordered_map<std::string, uint32_t> apply_count_;
 };
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 0d4e28a..0aa4934 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -150,10 +150,16 @@ class C2Agent : public state::UpdateController {
    */
   void handle_describe(const C2ContentResponse &resp);
 
+
+  enum class UpdateResult {
+    NO_UPDATE,
+    UPDATE_SUCCESSFUL,
+    UPDATE_FAILED
+  };
   /**
    * Updates a property
    */
-  bool update_property(const std::string &property_name, const std::string &property_value,  PropertyChangeLifetime lifetime);
+  UpdateResult update_property(const std::string &property_name, const std::string &property_value,  PropertyChangeLifetime lifetime);
 
   void handle_transfer(const C2ContentResponse &resp);
 
@@ -169,6 +175,7 @@ class C2Agent : public state::UpdateController {
   utils::TaskRescheduleInfo consume();
 
   bool handleConfigurationUpdate(const C2ContentResponse &resp);
+  void handlePropertyUpdate(const C2ContentResponse &resp);
 
   std::optional<std::string> resolveFlowUrl(const std::string& url) const;
 
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 4fee93b..8364140 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -40,7 +40,8 @@ enum class UpdateState {
   NOT_APPLIED,
   SET_ERROR,
   READ_ERROR,
-  NESTED  // multiple updates embedded into one
+  NESTED,  // multiple updates embedded into one
+  NO_OPERATION
 };
 
 /**
@@ -102,49 +103,6 @@ class Update {
   UpdateStatus status_;
 };
 
-/**
- * Justification and Purpose: Update Runner reflects the post execution functors that determine if
- * a given function that is running within a thread pool worker needs to end.
- *
- * Design: Simply implements isFinished and isCancelled, which it receives by way of the AfterExecute
- * class.
- */
-class UpdateRunner : public utils::AfterExecute<Update> {
- public:
-  explicit UpdateRunner(std::atomic<bool> &running, const int64_t &delay)
-      : running_(&running),
-        delay_(delay) {
-  }
-
-  UpdateRunner(const UpdateRunner &other) = delete;
-  UpdateRunner(UpdateRunner &&other) = delete;
-
-  ~UpdateRunner() = default;
-
-  UpdateRunner& operator=(const UpdateRunner &other) = delete;
-  UpdateRunner& operator=(UpdateRunner &&other) = delete;
-
-  virtual bool isFinished(const Update &result) {
-    if ((result.getStatus().getState() == UpdateState::FULLY_APPLIED || result.getStatus().getState() == UpdateState::READ_COMPLETE) && *running_) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-  virtual bool isCancelled(const Update& /*result*/) {
-    return !*running_;
-  }
-
-  virtual std::chrono::milliseconds wait_time() {
-    return delay_;
-  }
-
- protected:
-  std::atomic<bool> *running_;
-
-  std::chrono::milliseconds delay_;
-};
-
 class Pausable {
  public:
   virtual ~Pausable() = default;
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 55b66f0..2aa42e0 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -30,6 +30,14 @@ namespace nifi {
 namespace minifi {
 
 bool Configure::get(const std::string& key, std::string& value) const {
+  static constexpr std::string_view log_prefix = "nifi.log.";
+  if (utils::StringUtils::startsWith(key, log_prefix)) {
+    if (logger_properties_) {
+      return logger_properties_->getString(key.substr(log_prefix.length()), value);
+    }
+    return false;
+  }
+
   bool found = getString(key, value);
   if (decryptor_ && found && isEncrypted(key)) {
     value = decryptor_->decrypt(value);
@@ -54,12 +62,11 @@ bool Configure::get(const std::string& key, const std::string& alternate_key, st
 }
 
 std::optional<std::string> Configure::get(const std::string& key) const {
-  auto value = getString(key);
-  if (decryptor_ && value && isEncrypted(key)) {
-    return decryptor_->decrypt(*value);
-  } else {
+  std::string value;
+  if (get(key, value)) {
     return value;
   }
+  return std::nullopt;
 }
 
 bool Configure::isEncrypted(const std::string& key) const {
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index a45379a..c39bebc 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -313,6 +313,9 @@ void C2Agent::extractPayload(const C2Payload &resp) {
     case state::UpdateState::READ_ERROR:
       logger_->log_debug("Received read error event from protocol");
       break;
+    case state::UpdateState::NO_OPERATION:
+      logger_->log_debug("Received no operation event from protocol");
+      break;
     default:
       logger_->log_error("Received unknown event (%d) from protocol", static_cast<int>(resp.getStatus().getState()));
       break;
@@ -601,23 +604,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
       break;
     }
     case UpdateOperand::PROPERTIES: {
-      state::UpdateState result = state::UpdateState::FULLY_APPLIED;
-      for (auto entry : resp.operation_arguments) {
-        bool persist = (
-            entry.second.getAnnotation("persist")
-            | utils::map(&AnnotatedValue::to_string)
-            | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
-        PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
-        if (!update_property(entry.first, entry.second.to_string(), lifetime)) {
-          result = state::UpdateState::PARTIALLY_APPLIED;
-        }
-      }
-      // apply changes and persist properties requested to be persisted
-      if (!configuration_->commitChanges()) {
-        result = state::UpdateState::PARTIALLY_APPLIED;
-      }
-      C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
-      enqueue_c2_response(std::move(response));
+      handlePropertyUpdate(resp);
       break;
     } case UpdateOperand::C2: {
       // prior configuration options were already in place. thus
@@ -643,15 +630,51 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
   }
 }
 
+void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) {
+  state::UpdateState result = state::UpdateState::NO_OPERATION;
+  auto changeUpdateState = [&result](UpdateResult update_result) {
+    if (result == state::UpdateState::NO_OPERATION) {
+      if (update_result == UpdateResult::UPDATE_SUCCESSFUL) {
+        result = state::UpdateState::FULLY_APPLIED;
+      } else if (update_result == UpdateResult::UPDATE_FAILED) {
+        result = state::UpdateState::PARTIALLY_APPLIED;
+      }
+    } else if (result == state::UpdateState::FULLY_APPLIED && update_result == UpdateResult::UPDATE_FAILED) {
+      result = state::UpdateState::PARTIALLY_APPLIED;
+    }
+  };
+
+  for (auto entry : resp.operation_arguments) {
+    bool persist = (
+        entry.second.getAnnotation("persist")
+        | utils::map(&AnnotatedValue::to_string)
+        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+    PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
+    changeUpdateState(update_property(entry.first, entry.second.to_string(), lifetime));
+  }
+  // apply changes and persist properties requested to be persisted
+  if (result != state::UpdateState::NO_OPERATION && !configuration_->commitChanges()) {
+    result = state::UpdateState::PARTIALLY_APPLIED;
+  }
+  C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
+  enqueue_c2_response(std::move(response));
+}
+
 /**
  * Updates a property
  */
-bool C2Agent::update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime) {
+C2Agent::UpdateResult C2Agent::update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime) {
   if (update_service_ && !update_service_->canUpdate(property_name)) {
-    return false;
+    return UpdateResult::UPDATE_FAILED;
+  }
+
+  std::string value;
+  if (configuration_->get(property_name, value) && value == property_value) {
+    return UpdateResult::NO_UPDATE;
   }
+
   configuration_->set(property_name, property_value, lifetime);
-  return true;
+  return UpdateResult::UPDATE_SUCCESSFUL;
 }
 
 C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files) {
diff --git a/libminifi/src/c2/HeartbeatJsonSerializer.cpp b/libminifi/src/c2/HeartbeatJsonSerializer.cpp
index aa4b1e5..7ec3ce4 100644
--- a/libminifi/src/c2/HeartbeatJsonSerializer.cpp
+++ b/libminifi/src/c2/HeartbeatJsonSerializer.cpp
@@ -47,6 +47,8 @@ static void serializeOperationInfo(rapidjson::Value& target, const C2Payload& pa
         return "PARTIALLY_APPLIED";
       case state::UpdateState::READ_ERROR:
         return "OPERATION_NOT_UNDERSTOOD";
+      case state::UpdateState::NO_OPERATION:
+        return "NO_OPERATION";
       case state::UpdateState::SET_ERROR:
       default:
         return "NOT_APPLIED";