You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/03/22 16:22:42 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1770 Add C2 NO_OPERATION response if property update is NoOp
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 4000ded MINIFICPP-1770 Add C2 NO_OPERATION response if property update is NoOp
4000ded is described below
commit 4000dedab86fe171fbeb09a14d4f2099e3011cda
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Mar 22 17:21:04 2022 +0100
MINIFICPP-1770 Add C2 NO_OPERATION response if property update is NoOp
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1280
---
extensions/coap/protocols/CoapC2Protocol.cpp | 1 +
.../http-curl/tests/C2PropertiesUpdateTests.cpp | 3 +-
extensions/http-curl/tests/HTTPHandlers.h | 22 ++++++--
libminifi/include/c2/C2Agent.h | 9 +++-
libminifi/include/core/state/UpdateController.h | 46 +---------------
libminifi/src/Configure.cpp | 15 ++++--
libminifi/src/c2/C2Agent.cpp | 63 +++++++++++++++-------
libminifi/src/c2/HeartbeatJsonSerializer.cpp | 2 +
8 files changed, 88 insertions(+), 73 deletions(-)
diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp b/extensions/coap/protocols/CoapC2Protocol.cpp
index 1d1f396..04e6df9 100644
--- a/extensions/coap/protocols/CoapC2Protocol.cpp
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -65,6 +65,7 @@ int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2:
case state::UpdateState::INITIATE:
case state::UpdateState::FULLY_APPLIED:
case state::UpdateState::READ_COMPLETE:
+ case state::UpdateState::NO_OPERATION:
payloadState = 0;
break;
case state::UpdateState::NOT_APPLIED:
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index 36812d0..e802695 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -43,7 +43,6 @@ class C2HeartbeatHandler : public ServerAwareHandler {
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
response_->length());
mg_printf(conn, "%s", response_->c_str());
- response_.reset();
} else {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -150,6 +149,8 @@ int main() {
VerifyPropertyUpdate harness([&] {
assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");}));
+ assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("FULLY_APPLIED") == 1;}));
+ assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.getApplyCount("NO_OPERATION") > 0;}));
// update operation acknowledged
{
// verify final log levels
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 6f9e193..25ebbc0 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -792,20 +792,36 @@ class C2AcknowledgeHandler : public ServerAwareHandler {
rapidjson::Document root;
root.Parse(req.data(), req.size());
if (root.IsObject() && root.HasMember("operationId")) {
- std::lock_guard<std::mutex> guard(mtx_);
+ std::lock_guard<std::mutex> guard(ack_operations_mtx_);
acknowledged_operations_.insert(root["operationId"].GetString());
}
+
+ if (root.IsObject() && root.HasMember("operationState")) {
+ if (root["operationState"].IsObject() && root["operationState"].HasMember("state")) {
+ std::lock_guard<std::mutex> guard(apply_count_mtx_);
+ auto result_state = root["operationState"]["state"].GetString();
+ ++apply_count_[result_state];
+ }
+ }
+
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
bool isAcknowledged(const std::string& operation_id) const {
- std::lock_guard<std::mutex> guard(mtx_);
+ std::lock_guard<std::mutex> guard(ack_operations_mtx_);
return acknowledged_operations_.count(operation_id) > 0;
}
+ uint32_t getApplyCount(const std::string& result_state) const {
+ std::lock_guard<std::mutex> guard(apply_count_mtx_);
+ return apply_count_.find(result_state) != apply_count_.end() ? apply_count_.at(result_state) : 0;
+ }
+
private:
- mutable std::mutex mtx_;
+ mutable std::mutex ack_operations_mtx_;
+ mutable std::mutex apply_count_mtx_;
std::set<std::string> acknowledged_operations_;
+ std::unordered_map<std::string, uint32_t> apply_count_;
};
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 0d4e28a..0aa4934 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -150,10 +150,16 @@ class C2Agent : public state::UpdateController {
*/
void handle_describe(const C2ContentResponse &resp);
+
+ enum class UpdateResult {
+ NO_UPDATE,
+ UPDATE_SUCCESSFUL,
+ UPDATE_FAILED
+ };
/**
* Updates a property
*/
- bool update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime);
+ UpdateResult update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime);
void handle_transfer(const C2ContentResponse &resp);
@@ -169,6 +175,7 @@ class C2Agent : public state::UpdateController {
utils::TaskRescheduleInfo consume();
bool handleConfigurationUpdate(const C2ContentResponse &resp);
+ void handlePropertyUpdate(const C2ContentResponse &resp);
std::optional<std::string> resolveFlowUrl(const std::string& url) const;
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 4fee93b..8364140 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -40,7 +40,8 @@ enum class UpdateState {
NOT_APPLIED,
SET_ERROR,
READ_ERROR,
- NESTED // multiple updates embedded into one
+ NESTED, // multiple updates embedded into one
+ NO_OPERATION
};
/**
@@ -102,49 +103,6 @@ class Update {
UpdateStatus status_;
};
-/**
- * Justification and Purpose: Update Runner reflects the post execution functors that determine if
- * a given function that is running within a thread pool worker needs to end.
- *
- * Design: Simply implements isFinished and isCancelled, which it receives by way of the AfterExecute
- * class.
- */
-class UpdateRunner : public utils::AfterExecute<Update> {
- public:
- explicit UpdateRunner(std::atomic<bool> &running, const int64_t &delay)
- : running_(&running),
- delay_(delay) {
- }
-
- UpdateRunner(const UpdateRunner &other) = delete;
- UpdateRunner(UpdateRunner &&other) = delete;
-
- ~UpdateRunner() = default;
-
- UpdateRunner& operator=(const UpdateRunner &other) = delete;
- UpdateRunner& operator=(UpdateRunner &&other) = delete;
-
- virtual bool isFinished(const Update &result) {
- if ((result.getStatus().getState() == UpdateState::FULLY_APPLIED || result.getStatus().getState() == UpdateState::READ_COMPLETE) && *running_) {
- return false;
- } else {
- return true;
- }
- }
- virtual bool isCancelled(const Update& /*result*/) {
- return !*running_;
- }
-
- virtual std::chrono::milliseconds wait_time() {
- return delay_;
- }
-
- protected:
- std::atomic<bool> *running_;
-
- std::chrono::milliseconds delay_;
-};
-
class Pausable {
public:
virtual ~Pausable() = default;
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 55b66f0..2aa42e0 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -30,6 +30,14 @@ namespace nifi {
namespace minifi {
bool Configure::get(const std::string& key, std::string& value) const {
+ static constexpr std::string_view log_prefix = "nifi.log.";
+ if (utils::StringUtils::startsWith(key, log_prefix)) {
+ if (logger_properties_) {
+ return logger_properties_->getString(key.substr(log_prefix.length()), value);
+ }
+ return false;
+ }
+
bool found = getString(key, value);
if (decryptor_ && found && isEncrypted(key)) {
value = decryptor_->decrypt(value);
@@ -54,12 +62,11 @@ bool Configure::get(const std::string& key, const std::string& alternate_key, st
}
std::optional<std::string> Configure::get(const std::string& key) const {
- auto value = getString(key);
- if (decryptor_ && value && isEncrypted(key)) {
- return decryptor_->decrypt(*value);
- } else {
+ std::string value;
+ if (get(key, value)) {
return value;
}
+ return std::nullopt;
}
bool Configure::isEncrypted(const std::string& key) const {
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index a45379a..c39bebc 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -313,6 +313,9 @@ void C2Agent::extractPayload(const C2Payload &resp) {
case state::UpdateState::READ_ERROR:
logger_->log_debug("Received read error event from protocol");
break;
+ case state::UpdateState::NO_OPERATION:
+ logger_->log_debug("Received no operation event from protocol");
+ break;
default:
logger_->log_error("Received unknown event (%d) from protocol", static_cast<int>(resp.getStatus().getState()));
break;
@@ -601,23 +604,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
break;
}
case UpdateOperand::PROPERTIES: {
- state::UpdateState result = state::UpdateState::FULLY_APPLIED;
- for (auto entry : resp.operation_arguments) {
- bool persist = (
- entry.second.getAnnotation("persist")
- | utils::map(&AnnotatedValue::to_string)
- | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
- PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
- if (!update_property(entry.first, entry.second.to_string(), lifetime)) {
- result = state::UpdateState::PARTIALLY_APPLIED;
- }
- }
- // apply changes and persist properties requested to be persisted
- if (!configuration_->commitChanges()) {
- result = state::UpdateState::PARTIALLY_APPLIED;
- }
- C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
- enqueue_c2_response(std::move(response));
+ handlePropertyUpdate(resp);
break;
} case UpdateOperand::C2: {
// prior configuration options were already in place. thus
@@ -643,15 +630,51 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
}
}
+void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) {
+ state::UpdateState result = state::UpdateState::NO_OPERATION;
+ auto changeUpdateState = [&result](UpdateResult update_result) {
+ if (result == state::UpdateState::NO_OPERATION) {
+ if (update_result == UpdateResult::UPDATE_SUCCESSFUL) {
+ result = state::UpdateState::FULLY_APPLIED;
+ } else if (update_result == UpdateResult::UPDATE_FAILED) {
+ result = state::UpdateState::PARTIALLY_APPLIED;
+ }
+ } else if (result == state::UpdateState::FULLY_APPLIED && update_result == UpdateResult::UPDATE_FAILED) {
+ result = state::UpdateState::PARTIALLY_APPLIED;
+ }
+ };
+
+ for (auto entry : resp.operation_arguments) {
+ bool persist = (
+ entry.second.getAnnotation("persist")
+ | utils::map(&AnnotatedValue::to_string)
+ | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+ PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
+ changeUpdateState(update_property(entry.first, entry.second.to_string(), lifetime));
+ }
+ // apply changes and persist properties requested to be persisted
+ if (result != state::UpdateState::NO_OPERATION && !configuration_->commitChanges()) {
+ result = state::UpdateState::PARTIALLY_APPLIED;
+ }
+ C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
+ enqueue_c2_response(std::move(response));
+}
+
/**
* Updates a property
*/
-bool C2Agent::update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime) {
+C2Agent::UpdateResult C2Agent::update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime) {
if (update_service_ && !update_service_->canUpdate(property_name)) {
- return false;
+ return UpdateResult::UPDATE_FAILED;
+ }
+
+ std::string value;
+ if (configuration_->get(property_name, value) && value == property_value) {
+ return UpdateResult::NO_UPDATE;
}
+
configuration_->set(property_name, property_value, lifetime);
- return true;
+ return UpdateResult::UPDATE_SUCCESSFUL;
}
C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files) {
diff --git a/libminifi/src/c2/HeartbeatJsonSerializer.cpp b/libminifi/src/c2/HeartbeatJsonSerializer.cpp
index aa4b1e5..7ec3ce4 100644
--- a/libminifi/src/c2/HeartbeatJsonSerializer.cpp
+++ b/libminifi/src/c2/HeartbeatJsonSerializer.cpp
@@ -47,6 +47,8 @@ static void serializeOperationInfo(rapidjson::Value& target, const C2Payload& pa
return "PARTIALLY_APPLIED";
case state::UpdateState::READ_ERROR:
return "OPERATION_NOT_UNDERSTOOD";
+ case state::UpdateState::NO_OPERATION:
+ return "NO_OPERATION";
case state::UpdateState::SET_ERROR:
default:
return "NOT_APPLIED";