You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/18 21:04:09 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1226 improve C2
heartbeat performance and simplify C2Payload code
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 9f74c44 MINIFICPP-1226 improve C2 heartbeat performance and simplify C2Payload code
9f74c44 is described below
commit 9f74c444bd91ceb88d29b35525d6a73c26e83649
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Fri May 15 17:07:10 2020 +0200
MINIFICPP-1226 improve C2 heartbeat performance and simplify C2Payload code
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #787
---
libminifi/include/c2/C2Payload.h | 185 ++++++---------------
.../include/core/state/nodes/AgentInformation.h | 10 +-
libminifi/src/c2/C2Agent.cpp | 3 +
libminifi/src/c2/C2Payload.cpp | 105 ++----------
4 files changed, 72 insertions(+), 231 deletions(-)
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index 974750a..0ac34a4 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -21,6 +21,7 @@
#include <memory>
#include <string>
#include <map>
+#include <limits>
#include "../core/state/Value.h"
#include "core/state/UpdateController.h"
@@ -53,50 +54,36 @@ enum Direction {
RECEIVE
};
-class C2ContentResponse {
- public:
- C2ContentResponse(Operation op);
-
- C2ContentResponse(const C2ContentResponse &other);
-
- C2ContentResponse(const C2ContentResponse &&other);
+struct C2ContentResponse {
+ explicit C2ContentResponse(Operation op)
+ :op{ op }
+ {}
- C2ContentResponse & operator=(const C2ContentResponse &&other);
+ C2ContentResponse(const C2ContentResponse&) = default;
+ C2ContentResponse(C2ContentResponse&&) = default;
+ C2ContentResponse& operator=(const C2ContentResponse&) = default;
+ C2ContentResponse& operator=(C2ContentResponse&&) = default;
- C2ContentResponse & operator=(const C2ContentResponse &other);
-
- inline bool operator==(const C2ContentResponse &rhs) const {
- if (op != rhs.op)
- return false;
- if (required != rhs.required)
- return false;
- if (ident != rhs.ident)
- return false;
- if (name != rhs.name)
- return false;
- if (operation_arguments != rhs.operation_arguments)
- return false;
- return true;
+ bool operator==(const C2ContentResponse &other) const {
+ return std::tie(this->op, this->required, this->ident, this->name, this->operation_arguments)
+ == std::tie(other.op, other.required, other.ident, other.name, other.operation_arguments);
}
- inline bool operator!=(const C2ContentResponse &rhs) const {
- return !(*this == rhs);
- }
+ bool operator!=(const C2ContentResponse &rhs) const { return !(*this == rhs); }
Operation op;
// determines if the operation is required
- bool required;
+ bool required{ false };
// identifier
std::string ident;
// delay before running
- uint32_t delay;
+ uint32_t delay{ 0 };
// max time before this response will no longer be honored.
- uint64_t ttl;
+ uint64_t ttl{ (std::numeric_limits<uint64_t>::max)() };
// name applied to commands
std::string name;
// commands that correspond with the operation.
std::map<std::string, state::response::ValueNode> operation_arguments;
-// std::vector<std::string> content;
};
/**
@@ -108,160 +95,98 @@ class C2ContentResponse {
*/
class C2Payload : public state::Update {
public:
- virtual ~C2Payload() {
-
- }
-
- C2Payload(Operation op, const std::string &identifier, bool resp = false, bool isRaw = false);
-
- C2Payload(Operation op, state::UpdateState state,const std::string &identifier, bool resp = false, bool isRaw = false);
-
- C2Payload(Operation op, bool resp = false, bool isRaw = false);
-
+ C2Payload(Operation op, std::string identifier, bool resp = false, bool isRaw = false);
+ C2Payload(Operation op, state::UpdateState state, std::string identifier, bool resp = false, bool isRaw = false);
+ explicit C2Payload(Operation op, bool resp = false, bool isRaw = false);
C2Payload(Operation op, state::UpdateState state, bool resp = false, bool isRaw = false);
- C2Payload(const C2Payload &other) = default;
-
- C2Payload(C2Payload &&other) = default;
+ C2Payload(const C2Payload&) = default;
+ C2Payload(C2Payload&&) = default;
+ C2Payload &operator=(const C2Payload&) = default;
+ C2Payload &operator=(C2Payload&&) = default;
- void setIdentifier(const std::string &ident);
+ ~C2Payload() override = default;
- std::string getIdentifier() const;
+ void setIdentifier(std::string ident) { ident_ = std::move(ident); }
+ std::string getIdentifier() const { return ident_; }
- void setLabel(const std::string label) {
- label_ = label;
- }
-
- std::string getLabel() const {
- return label_;
- }
+ void setLabel(std::string label) { label_ = std::move(label); }
+ std::string getLabel() const { return label_; }
/**
* Gets the operation for this payload. May be nested or a single operation.
*/
- Operation getOperation() const;
+ Operation getOperation() const noexcept { return op_; }
/**
* Validate the payload, if necessary and/or possible.
*/
- virtual bool validate();
+ bool validate() override { return true; }
/**
* Get content responses from this payload.
*/
- const std::vector<C2ContentResponse> &getContent() const;
+ const std::vector<C2ContentResponse> &getContent() const noexcept { return content_; }
/**
* Add a content response to this payload.
*/
- void addContent(const C2ContentResponse &&content, bool collapsible = true);
+ void addContent(C2ContentResponse&&, bool collapsible = true);
/**
* Determines if this object contains raw data.
*/
- bool isRaw() const;
+ bool isRaw() const noexcept { return raw_; }
/**
* Sets raw data within this object.
*/
- void setRawData(const std::string &data);
-
- /**
- * Sets raw data from a vector within this object.
- */
- void setRawData(const std::vector<char> &data);
-
- /**
- * Sets raw data from a vector of uint8_t within this object.
- */
- void setRawData(const std::vector<uint8_t> &data);
+ void setRawData(const std::string&);
+ void setRawData(const std::vector<char>&);
+ void setRawData(const std::vector<uint8_t>&);
/**
* Returns raw data.
*/
- std::vector<char> getRawData() const;
+ std::vector<char> getRawData() const { return raw_data_; }
/**
* Add a nested payload.
* @param payload payload to move into this object.
*/
- void addPayload(const C2Payload &&payload);
+ void addPayload(C2Payload &&payload);
- bool isCollapsible() const {
- return is_collapsible_;
- }
+ bool isCollapsible() const noexcept { return is_collapsible_; }
+ void setCollapsible(bool is_collapsible) noexcept { is_collapsible_ = is_collapsible; }
- void setCollapsible(bool is_collapsible) {
- is_collapsible_ = is_collapsible;
- }
-
- bool isContainer() const {
- return is_container_;
- }
+ bool isContainer() const noexcept { return is_container_; }
+ void setContainer(bool is_container) noexcept { is_container_ = is_container; }
- void setContainer(bool is_container) {
- is_container_ = is_container;
- }
/**
* Get nested payloads.
*/
- const std::vector<C2Payload> &getNestedPayloads() const;
-
- C2Payload &operator=(C2Payload &&other) = default;
- C2Payload &operator=(const C2Payload &other) = default;
-
- inline bool operator==(const C2Payload &rhs) const {
- if (op_ != rhs.op_) {
- return false;
- }
- if (ident_ != rhs.ident_) {
- return false;
- }
- if (label_ != rhs.label_) {
- return false;
- }
- if (payloads_ != rhs.payloads_) {
- return false;
- }
- if (content_ != rhs.content_) {
- return false;
- }
- if (raw_ != rhs.raw_) {
- return false;
- }
- if (raw_data_ != rhs.raw_data_) {
- return false;
- }
- return true;
- }
+ const std::vector<C2Payload> &getNestedPayloads() const noexcept { return payloads_; }
- inline bool operator!=(const C2Payload &rhs) const {
- return !(*this == rhs);
- }
+ void reservePayloads(size_t new_capacity) { payloads_.reserve(new_capacity); }
- protected:
+ bool operator==(const C2Payload &other) const {
+ return std::tie(this->op_, this->ident_, this->label_, this->payloads_, this->content_, this->raw_, this->raw_data_)
+ == std::tie(other.op_, other.ident_, other.label_, other.payloads_, other.content_, other.raw_, other.raw_data_);
+ }
- // identifier for this payload.
- std::string ident_;
+ bool operator!=(const C2Payload &rhs) const { return !(*this == rhs); }
+ protected:
+ std::string ident_; // identifier for this payload.
std::string label_;
-
std::vector<C2Payload> payloads_;
-
std::vector<C2ContentResponse> content_;
-
Operation op_;
-
- bool raw_;
-
+ bool raw_{ false };
std::vector<char> raw_data_;
-
- bool isResponse;
-
- bool is_container_;
-
- bool is_collapsible_;
-
+ bool isResponse{ false };
+ bool is_container_{ false };
+ bool is_collapsible_{ true };
};
} /* namesapce c2 */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 4e163e5..9dacd6f 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -669,16 +669,10 @@ protected:
}
std::vector<SerializedResponseNode> getAgentManifest() const {
- std::vector<SerializedResponseNode> serialized;
- AgentManifest manifest("manifest");
-
SerializedResponseNode agentManifest;
agentManifest.name = "agentManifest";
- for (auto &ser : manifest.serialize()) {
- agentManifest.children.push_back(std::move(ser));
- }
- serialized.push_back(agentManifest);
- return serialized;
+ agentManifest.children = AgentManifest{"manifest"}.serialize();
+ return std::vector<SerializedResponseNode>{ agentManifest };
}
std::vector<SerializedResponseNode> getAgentStatus() const {
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index b71b8f7..3347da8 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -316,6 +316,7 @@ void C2Agent::performHeartBeat() {
metrics = reporter->getHeartbeatNodes(false);
}
+ payload.reservePayloads(metrics.size());
for (const auto& metric : metrics) {
C2Payload child_metric_payload(Operation::HEARTBEAT);
child_metric_payload.setLabel(metric->getName());
@@ -336,6 +337,8 @@ void C2Agent::performHeartBeat() {
}
void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::response::SerializedResponseNode> &metrics, bool is_container, bool is_collapsible) {
+ const auto payloads = std::count_if(begin(metrics), end(metrics), [](const state::response::SerializedResponseNode& metric) { return !metric.children.empty(); });
+ metric_payload.reservePayloads(metric_payload.getNestedPayloads().size() + payloads);
for (const auto &metric : metrics) {
if (metric.children.size() > 0) {
C2Payload child_metric_payload(metric_payload.getOperation());
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
index 61487ee..868c6ef 100644
--- a/libminifi/src/c2/C2Payload.cpp
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -27,107 +27,34 @@ namespace nifi {
namespace minifi {
namespace c2 {
-C2ContentResponse::C2ContentResponse(Operation op)
- : op(op),
- required(false),
- delay(0),
- ttl(-1) {
+C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw)
+ : C2Payload(op, state::UpdateState::FULLY_APPLIED, std::move(identifier), resp, isRaw) {
}
-C2ContentResponse::C2ContentResponse(const C2ContentResponse &other)
- : op(other.op),
- required(other.required),
- delay(other.delay),
- ttl(other.ttl),
- name(other.name),
- ident(other.ident),
- operation_arguments(other.operation_arguments) {
-}
-
-C2ContentResponse::C2ContentResponse(const C2ContentResponse &&other)
- : op(other.op),
- required(other.required),
- delay(std::move(other.delay)),
- ttl(std::move(other.ttl)),
- ident(std::move(other.ident)),
- name(std::move(other.name)),
- operation_arguments(std::move(other.operation_arguments)) {
-}
-
-C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &&other) {
- op = other.op;
- required = other.required;
- delay = std::move(other.delay);
- ttl = std::move(other.ttl);
- name = std::move(other.name);
- ident = std::move(other.ident);
- operation_arguments = std::move(other.operation_arguments);
- return *this;
-}
-
-C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &other) {
- op = other.op;
- required = other.required;
- delay = other.delay;
- ttl = other.ttl;
- name = other.name;
- operation_arguments = other.operation_arguments;
- return *this;
-}
-
-C2Payload::C2Payload(Operation op, state::UpdateState state, const std::string &identifier, bool resp, bool isRaw)
+C2Payload::C2Payload(Operation op, state::UpdateState state, std::string identifier, bool resp, bool isRaw)
: state::Update(state::UpdateStatus(state, 0)),
op_(op),
raw_(isRaw),
- ident_(identifier),
- isResponse(resp),
- is_container_(false),
- is_collapsible_(true) {
+ ident_(std::move(identifier)),
+ isResponse(resp) {
}
-C2Payload::C2Payload(Operation op, const std::string &identifier, bool resp, bool isRaw)
- : C2Payload(op, state::UpdateState::FULLY_APPLIED, identifier, resp, isRaw) {
-}
C2Payload::C2Payload(Operation op, bool resp, bool isRaw)
: state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
op_(op),
raw_(isRaw),
- isResponse(resp),
- is_container_(false),
- is_collapsible_(true) {
+ isResponse(resp) {
}
C2Payload::C2Payload(Operation op, state::UpdateState state, bool resp, bool isRaw)
: state::Update(state::UpdateStatus(state, 0)),
op_(op),
raw_(isRaw),
- isResponse(resp),
- is_container_(false),
- is_collapsible_(true) {
-}
-
-void C2Payload::setIdentifier(const std::string &ident) {
- ident_ = ident;
-}
-
-std::string C2Payload::getIdentifier() const {
- return ident_;
-}
-
-Operation C2Payload::getOperation() const {
- return op_;
-}
-
-bool C2Payload::validate() {
- return true;
-}
-
-const std::vector<C2ContentResponse> &C2Payload::getContent() const {
- return content_;
+ isResponse(resp) {
}
-void C2Payload::addContent(const C2ContentResponse &&content, bool collapsible) {
+void C2Payload::addContent(C2ContentResponse &&content, bool collapsible) {
if (collapsible) {
for (auto &existing_content : content_) {
if (existing_content.name == content.name) {
@@ -139,34 +66,26 @@ void C2Payload::addContent(const C2ContentResponse &&content, bool collapsible)
content_.push_back(std::move(content));
}
-bool C2Payload::isRaw() const {
- return raw_;
-}
-
void C2Payload::setRawData(const std::string &data) {
+ raw_data_.reserve(raw_data_.size() + data.size());
raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data));
}
void C2Payload::setRawData(const std::vector<char> &data) {
+ raw_data_.reserve(raw_data_.size() + data.size());
raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data));
}
void C2Payload::setRawData(const std::vector<uint8_t> &data) {
+ raw_data_.reserve(raw_data_.size() + data.size());
std::transform(std::begin(data), std::end(data), std::back_inserter(raw_data_), [](uint8_t c) {
return static_cast<char>(c);
});
}
-std::vector<char> C2Payload::getRawData() const {
- return raw_data_;
-}
-
-void C2Payload::addPayload(const C2Payload &&payload) {
+void C2Payload::addPayload(C2Payload &&payload) {
payloads_.push_back(std::move(payload));
}
-const std::vector<C2Payload> &C2Payload::getNestedPayloads() const {
- return payloads_;
-}
} /* namespace c2 */
} /* namespace minifi */