You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/10/26 10:37:38 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1387 - Speed up
Identifier::to_string
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e22ede1 MINIFICPP-1387 - Speed up Identifier::to_string
e22ede1 is described below
commit e22ede1ec3673e8667022435601db2e533080f55
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Thu Oct 8 15:43:37 2020 +0200
MINIFICPP-1387 - Speed up Identifier::to_string
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #926
---
libminifi/include/core/Core.h | 3 +-
libminifi/include/core/FlowFile.h | 5 -
libminifi/include/core/ProcessSession.h | 30 +++-
libminifi/include/core/logging/Logger.h | 6 +
.../include/core/state/nodes/FlowInformation.h | 2 +-
libminifi/include/io/OutputStream.h | 6 +
libminifi/include/utils/Id.h | 13 +-
libminifi/include/utils/SmallString.h | 90 ++++++++++
libminifi/src/core/FlowFile.cpp | 2 +-
libminifi/src/core/ProcessSession.cpp | 199 +++++++++------------
libminifi/src/utils/Id.cpp | 45 ++++-
nanofi/src/api/nanofi.cpp | 2 +-
12 files changed, 260 insertions(+), 143 deletions(-)
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 5dc8a39..6ca5303 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -193,9 +193,8 @@ class CoreComponent {
// unsigned const char *getUUID();
/**
* Return the UUID string
- * @param constant reference to the UUID str
*/
- std::string getUUIDStr() const {
+ utils::SmallString<36> getUUIDStr() const {
return uuid_.to_string();
}
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 73c569d..687ac0b 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -220,11 +220,6 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
*/
uint64_t getOffset() const;
- bool getUUID(utils::Identifier& other) {
- other = uuid_;
- return true;
- }
-
// Check whether it is still being penalized
bool isPenalized() const {
return penaltyExpiration_ms_ > 0 && penaltyExpiration_ms_ > utils::timeutils::getTimeMillis();
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index c453abe..125439c 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -139,25 +139,37 @@ class ProcessSession : public ReferenceContainer {
ProcessSession &operator=(const ProcessSession &parent) = delete;
protected:
+ struct FlowFileUpdate {
+ std::shared_ptr<FlowFile> modified;
+ std::shared_ptr<FlowFile> snapshot;
+ };
+
// FlowFiles being modified by current process session
- std::map<std::string, std::shared_ptr<core::FlowFile>> _updatedFlowFiles;
- // Copy of the original FlowFiles being modified by current process session as above
- std::map<std::string, std::shared_ptr<core::FlowFile>> _flowFileSnapShots;
+ std::map<utils::Identifier, FlowFileUpdate> _updatedFlowFiles;
// FlowFiles being added by current process session
- std::map<std::string, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
+ std::map<utils::Identifier, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
// FlowFiles being deleted by current process session
- std::map<std::string, std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
+ std::vector<std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
// FlowFiles being transfered to the relationship
- std::map<std::string, Relationship> _transferRelationship;
+ std::map<utils::Identifier, Relationship> _transferRelationship;
// FlowFiles being cloned for multiple connections per relationship
- std::map<std::string, std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
+ std::vector<std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
private:
+ enum class RouteResult {
+ Ok_Routed,
+ Ok_AutoTerminated,
+ Ok_Deleted,
+ Error_NoRelationship
+ };
+
+ RouteResult routeFlowFile(const std::shared_ptr<FlowFile>& record);
+
void persistFlowFilesBeforeTransfer(
std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
- const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots);
+ const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles);
// Clone the flow file during transfer to multiple connections for a relationship
- std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent);
+ std::shared_ptr<core::FlowFile> cloneDuringTransfer(const std::shared_ptr<core::FlowFile> &parent);
// ProcessContext
std::shared_ptr<ProcessContext> process_context_;
// Logger
diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h
index e409df9..332151f 100644
--- a/libminifi/include/core/logging/Logger.h
+++ b/libminifi/include/core/logging/Logger.h
@@ -27,6 +27,7 @@
#include "spdlog/common.h"
#include "spdlog/logger.h"
+#include "utils/SmallString.h"
namespace org {
namespace apache {
@@ -53,6 +54,11 @@ inline char const* conditional_conversion(std::string const& str) {
return str.c_str();
}
+template<size_t N>
+inline char const* conditional_conversion(const utils::SmallString<N>& arr) {
+ return arr.c_str();
+}
+
template<typename T>
inline T conditional_conversion(T const& t) {
return t;
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index 0777f4a..ba3e5e3 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -200,7 +200,7 @@ class FlowInformation : public FlowMonitor {
SerializedResponseNode queueUUIDNode;
queueUUIDNode.name = "uuid";
- queueUUIDNode.value = queue.second->getUUIDStr();
+ queueUUIDNode.value = std::string{queue.second->getUUIDStr()};
SerializedResponseNode queuesize;
queuesize.name = "size";
diff --git a/libminifi/include/io/OutputStream.h b/libminifi/include/io/OutputStream.h
index 466ebd0..ebee70c 100644
--- a/libminifi/include/io/OutputStream.h
+++ b/libminifi/include/io/OutputStream.h
@@ -23,6 +23,7 @@
#include <string>
#include "Stream.h"
#include "utils/gsl.h"
+#include "utils/SmallString.h"
namespace org {
namespace apache {
@@ -68,6 +69,11 @@ class OutputStream : public virtual Stream {
**/
int write(const char* str, bool widen = false);
+ template<size_t N>
+ int write(const utils::SmallString<N>& str, bool widen = false) {
+ return write(str.c_str(), widen);
+ }
+
/**
* writes sizeof(Integral) bytes to the stream
* @param value to write
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index 68dbf7a..bcd8119 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -23,6 +23,7 @@
#include <memory>
#include <string>
#include <thread>
+#include "SmallString.h"
#ifndef WIN32
class uuid;
@@ -53,6 +54,7 @@ namespace utils {
class Identifier {
friend struct IdentifierTestAccessor;
static constexpr const char* UUID_FORMAT_STRING = "%02hhx%02hhx%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx";
+ static constexpr const char* hex_lut = "0123456789abcdef";
public:
using Data = std::array<uint8_t, 16>;
@@ -65,10 +67,19 @@ class Identifier {
bool operator!=(const Identifier& other) const;
bool operator==(const Identifier& other) const;
+ bool operator<(const Identifier& other) const;
bool isNil() const;
- std::string to_string() const;
+ // Numerous places query the string representation
+ // just to then forward the temporary to build logs,
+ // streams, or others. Dynamically allocating in these
+ // instances is wasteful as we immediately discard
+ // the result. The difference on the test machine is 8x,
+ // building the representation itself takes 10ns, while
+ // subsequently turning it into a std::string would take
+ // 70ns more.
+ SmallString<36> to_string() const;
static utils::optional<Identifier> parse(const std::string& str);
diff --git a/libminifi/include/utils/SmallString.h b/libminifi/include/utils/SmallString.h
new file mode 100644
index 0000000..a4c17c6
--- /dev/null
+++ b/libminifi/include/utils/SmallString.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <array>
+#include <ostream>
+#include <string>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<size_t N>
+class SmallString : public std::array<char, N + 1> {
+ public:
+ operator std::string() const { // NOLINT
+ return {c_str()};
+ }
+
+ const char* c_str() const {
+ return this->data();
+ }
+
+ friend std::ostream &operator<<(std::ostream &out, const SmallString &str) {
+ return out << str.c_str();
+ }
+
+ friend std::string operator+(const std::string &lhs, const SmallString &rhs) {
+ return lhs + rhs.c_str();
+ }
+
+ friend std::string operator+(std::string &&lhs, const SmallString &rhs) {
+ return std::move(lhs) + rhs.c_str();
+ }
+
+ friend std::string operator+(const SmallString &lhs, const std::string &rhs) {
+ return lhs.c_str() + rhs;
+ }
+
+ friend std::string operator+(const SmallString &lhs, std::string &&rhs) {
+ return lhs.c_str() + std::move(rhs);
+ }
+
+ friend bool operator==(const std::string& lhs, const SmallString& rhs) {
+ return lhs == rhs.c_str();
+ }
+
+ friend bool operator==(const SmallString& lhs, const std::string& rhs) {
+ return lhs.c_str() == rhs;
+ }
+
+ friend bool operator==(const SmallString& lhs, const SmallString& rhs) {
+ return static_cast<std::array<char, N + 1>>(lhs) == static_cast<std::array<char, N + 1>>(rhs);
+ }
+
+ friend bool operator!=(const std::string& lhs, const SmallString& rhs) {
+ return !(lhs == rhs);
+ }
+
+ friend bool operator!=(const SmallString& lhs, const std::string& rhs) {
+ return !(lhs == rhs);
+ }
+
+ friend bool operator!=(const SmallString& lhs, const SmallString& rhs) {
+ return !(lhs == rhs);
+ }
+};
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 53f834a..a6f3fb1 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -108,7 +108,7 @@ void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<Resou
if (hasStashClaim(key)) {
logger_->log_warn("Stashing content of record %s to existing key %s; "
"existing content will be overwritten",
- getUUIDStr().c_str(), key.c_str());
+ getUUIDStr(), key.c_str());
}
stashedContent_[key] = claim;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 6233f73..2abf4cf 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -66,10 +66,12 @@ ProcessSession::~ProcessSession() {
}
void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
- if (_updatedFlowFiles.find(record->getUUIDStr()) != _updatedFlowFiles.end()) {
+ utils::Identifier uuid;
+ record->getUUID(uuid);
+ if (_updatedFlowFiles.find(uuid) != _updatedFlowFiles.end()) {
throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided by this session");
}
- _addedFlowFiles[record->getUUIDStr()] = record;
+ _addedFlowFiles[uuid] = record;
record->setDeleted(false);
}
@@ -94,7 +96,9 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<cor
parent->getlineageIdentifiers().push_back(parent->getUUIDStr());
}
- _addedFlowFiles[record->getUUIDStr()] = record;
+ utils::Identifier uuid;
+ record->getUUID(uuid);
+ _addedFlowFiles[uuid] = record;
logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " creates flow record " << record->getUUIDStr();
@@ -119,14 +123,14 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(const std::shared_ptr<core::FlowFile> &parent) {
auto record = std::make_shared<FlowFileRecord>();
auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
if (flow_version != nullptr) {
record->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
- this->_clonedFlowFiles[record->getUUIDStr()] = record;
+ this->_clonedFlowFiles.push_back(record);
logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr());
// Copy attributes
for (const auto& attribute : parent->getAttributes()) {
@@ -155,19 +159,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_
}
std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
+ if ((uint64_t) (offset + size) > parent->getSize()) {
+ // Set offset and size
+ logger_->log_error("clone offset %" PRId64 " and size %" PRId64 " exceed parent size %" PRIu64, offset, size, parent->getSize());
+ return nullptr;
+ }
std::shared_ptr<core::FlowFile> record = this->create(parent);
if (record) {
logger_->log_debug("Cloned parent flow files %s to %s, with %u:%u", parent->getUUIDStr(), record->getUUIDStr(), offset, size);
if (parent->getResourceClaim()) {
- if ((uint64_t) (offset + size) > parent->getSize()) {
- // Set offset and size
- logger_->log_error("clone offset %" PRId64 " and size %" PRId64 " exceed parent size %" PRIu64, offset, size, parent->getSize());
- // Remove the Add FlowFile for the session
- auto it = this->_addedFlowFiles.find(record->getUUIDStr());
- if (it != this->_addedFlowFiles.end())
- this->_addedFlowFiles.erase(record->getUUIDStr());
- return nullptr;
- }
record->setOffset(parent->getOffset() + offset);
record->setSize(size);
// Copy Resource Claim
@@ -180,7 +180,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
flow->setDeleted(true);
- _deletedFlowFiles[flow->getUUIDStr()] = flow;
+ _deletedFlowFiles.push_back(flow);
std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
provenance_report_->drop(flow, reason);
}
@@ -207,7 +207,9 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
- _transferRelationship[flow->getUUIDStr()] = relationship;
+ utils::Identifier uuid;
+ flow->getUUID(uuid);
+ _transferRelationship[uuid] = relationship;
flow->setDeleted(false);
}
@@ -629,46 +631,55 @@ void ProcessSession::restore(const std::string &key, const std::shared_ptr<core:
flow->clearStashClaim(key);
}
+ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<FlowFile> &record) {
+ if (record->isDeleted()) {
+ return RouteResult::Ok_Deleted;
+ }
+ utils::Identifier uuid;
+ record->getUUID(uuid);
+ auto itRelationship = _transferRelationship.find(uuid);
+ if (itRelationship == _transferRelationship.end()) {
+ return RouteResult::Error_NoRelationship;
+ }
+ Relationship relationship = itRelationship->second;
+ // Find the relationship, we need to find the connections for that relationship
+ std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
+ if (connections.empty()) {
+ // No connection
+ if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
+ // Not autoterminate, we should have the connect
+ std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
+ throw Exception(PROCESS_SESSION_EXCEPTION, message);
+ } else {
+ // Autoterminated
+ remove(record);
+ }
+ } else {
+ // We connections, clone the flow and assign the connection accordingly
+ for (auto itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
+ std::shared_ptr<Connectable> connection = *itConnection;
+ if (itConnection == connections.begin()) {
+ // First connection which the flow need be routed to
+ record->setConnection(connection);
+ } else {
+ // Clone the flow file and route to the connection
+ std::shared_ptr<core::FlowFile> cloneRecord = this->cloneDuringTransfer(record);
+ if (cloneRecord)
+ cloneRecord->setConnection(connection);
+ else
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer " + record->getUUIDStr());
+ }
+ }
+ }
+ return RouteResult::Ok_Routed;
+}
+
void ProcessSession::commit() {
try {
- // First we clone the flow record based on the transfered relationship for updated flow record
+ // First we clone the flow record based on the transferred relationship for updated flow record
for (auto && it : _updatedFlowFiles) {
- std::shared_ptr<core::FlowFile> record = it.second;
- if (record->isDeleted())
- continue;
- auto itRelationship = this->_transferRelationship.find(record->getUUIDStr());
- if (itRelationship != _transferRelationship.end()) {
- Relationship relationship = itRelationship->second;
- // Find the relationship, we need to find the connections for that relationship
- std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
- if (connections.empty()) {
- // No connection
- if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
- // Not autoterminate, we should have the connect
- std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
- throw Exception(PROCESS_SESSION_EXCEPTION, message);
- } else {
- // Autoterminated
- remove(record);
- }
- } else {
- // We connections, clone the flow and assign the connection accordingly
- for (auto itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
- std::shared_ptr<Connectable> connection = *itConnection;
- if (itConnection == connections.begin()) {
- // First connection which the flow need be routed to
- record->setConnection(connection);
- } else {
- // Clone the flow file and route to the connection
- std::shared_ptr<core::FlowFile> cloneRecord = this->cloneDuringTransfer(record);
- if (cloneRecord)
- cloneRecord->setConnection(connection);
- else
- throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer " + record->getUUIDStr());
- }
- }
- }
- } else {
+ auto record = it.second.modified;
+ if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the updated flow " + record->getUUIDStr());
}
@@ -676,44 +687,8 @@ void ProcessSession::commit() {
// Do the same thing for added flow file
for (const auto& it : _addedFlowFiles) {
- std::shared_ptr<core::FlowFile> record = it.second;
- if (record->isDeleted())
- continue;
- auto itRelationship = this->_transferRelationship.find(record->getUUIDStr());
- if (itRelationship != _transferRelationship.end()) {
- Relationship relationship = itRelationship->second;
- // Find the relationship, we need to find the connections for that relationship
- std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
- if (connections.empty()) {
- // No connection
- if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) {
- // Not autoterminate, we should have the connect
- std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
- throw Exception(PROCESS_SESSION_EXCEPTION, message);
- } else {
- logger_->log_debug("added flow file is auto terminated");
- // Auto-terminated
- remove(record);
- }
- } else {
- // We connections, clone the flow and assign the connection accordingly
- for (auto itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
- std::shared_ptr<Connectable> connection(*itConnection);
- if (itConnection == connections.begin()) {
- // First connection which the flow need be routed to
- record->setConnection(connection);
- } else {
- // Clone the flow file and route to the connection
- std::shared_ptr<core::FlowFile> cloneRecord;
- cloneRecord = this->cloneDuringTransfer(record);
- if (cloneRecord)
- cloneRecord->setConnection(connection);
- else
- throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer" + record->getUUIDStr());
- }
- }
- }
- } else {
+ auto record = it.second;
+ if (routeFlowFile(record) == RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the added flow " + record->getUUIDStr());
}
@@ -724,7 +699,7 @@ void ProcessSession::commit() {
std::shared_ptr<Connectable> connection = nullptr;
// Complete process the added and update flow files for the session, send the flow file to its queue
for (const auto &it : _updatedFlowFiles) {
- std::shared_ptr<core::FlowFile> record = it.second;
+ auto record = it.second.modified;
logger_->log_trace("See %s in %s", record->getUUIDStr(), "_updatedFlowFiles");
if (record->isDeleted()) {
continue;
@@ -736,7 +711,7 @@ void ProcessSession::commit() {
}
}
for (const auto &it : _addedFlowFiles) {
- std::shared_ptr<core::FlowFile> record = it.second;
+ auto record = it.second;
logger_->log_trace("See %s in %s", record->getUUIDStr(), "_addedFlowFiles");
if (record->isDeleted()) {
continue;
@@ -747,8 +722,7 @@ void ProcessSession::commit() {
}
}
// Process the clone flow files
- for (const auto &it : _clonedFlowFiles) {
- std::shared_ptr<core::FlowFile> record = it.second;
+ for (const auto &record : _clonedFlowFiles) {
logger_->log_trace("See %s in %s", record->getUUIDStr(), "_clonedFlowFiles");
if (record->isDeleted()) {
continue;
@@ -759,8 +733,7 @@ void ProcessSession::commit() {
}
}
- for (const auto& it : _deletedFlowFiles) {
- auto record = it.second;
+ for (const auto& record : _deletedFlowFiles) {
if (!record->isDeleted()) {
continue;
}
@@ -772,7 +745,7 @@ void ProcessSession::commit() {
content_session_->commit();
- persistFlowFilesBeforeTransfer(connectionQueues, _flowFileSnapShots);
+ persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
for (auto& cq : connectionQueues) {
auto connection = std::dynamic_pointer_cast<Connection>(cq.first);
@@ -790,7 +763,6 @@ void ProcessSession::commit() {
_addedFlowFiles.clear();
_clonedFlowFiles.clear();
_deletedFlowFiles.clear();
- _flowFileSnapShots.clear();
_transferRelationship.clear();
// persistent the provenance report
@@ -813,9 +785,9 @@ void ProcessSession::rollback() {
try {
// Requeue the snapshot of the flowfile back
for (const auto &it : _updatedFlowFiles) {
- auto flowFile = it.second;
+ auto flowFile = it.second.modified;
// restore flowFile to original state
- *flowFile = *_flowFileSnapShots.find(it.first)->second;
+ *flowFile = *it.second.snapshot;
logger_->log_debug("ProcessSession rollback for %s, record %s, to connection %s",
process_context_->getProcessorNode()->getName(),
flowFile->getUUIDStr(),
@@ -823,8 +795,8 @@ void ProcessSession::rollback() {
connectionQueues[flowFile->getConnection()].push_back(flowFile);
}
- for (const auto& it : _deletedFlowFiles) {
- it.second->setDeleted(false);
+ for (const auto& record : _deletedFlowFiles) {
+ record->setDeleted(false);
}
// put everything back where it came from
@@ -841,8 +813,6 @@ void ProcessSession::rollback() {
content_session_->rollback();
- _flowFileSnapShots.clear();
-
_clonedFlowFiles.clear();
_addedFlowFiles.clear();
_updatedFlowFiles.clear();
@@ -859,7 +829,7 @@ void ProcessSession::rollback() {
void ProcessSession::persistFlowFilesBeforeTransfer(
std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap,
- const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots) {
+ const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) {
std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
@@ -895,8 +865,10 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles() : false;
auto& flows = transaction.second;
for (auto &ff : flows) {
- auto snapshotIt = originalFlowFileSnapShots.find(ff->getUUIDStr());
- auto original = snapshotIt != originalFlowFileSnapShots.end() ? snapshotIt->second : nullptr;
+ utils::Identifier uuid;
+ ff->getUUID(uuid);
+ auto snapshotIt = modifiedFlowFiles.find(uuid);
+ auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
if (shouldDropEmptyFiles && ff->getSize() == 0) {
// the receiver promised to drop this FF, no need for it anymore
if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
@@ -947,17 +919,16 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
if (ret) {
// add the flow record to the current process session update map
ret->setDeleted(false);
- _updatedFlowFiles[ret->getUUIDStr()] = ret;
std::shared_ptr<FlowFile> snapshot = std::make_shared<FlowFileRecord>();
*snapshot = *ret;
+ logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
+ utils::Identifier uuid;
+ ret->getUUID(uuid);
+ _updatedFlowFiles[uuid] = {ret, snapshot};
auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
if (flow_version != nullptr) {
ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
- logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
- // save a snapshot
- auto result = _flowFileSnapShots.emplace(snapshot->getUUIDStr(), std::move(snapshot));
- assert(result.second);
return ret;
}
current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode()->pickIncomingConnection());
@@ -984,7 +955,7 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
bool ProcessSession::existsFlowFileInRelationship(const Relationship &relationship) {
return std::any_of(_transferRelationship.begin(), _transferRelationship.end(),
- [&relationship](const std::map<std::string, Relationship>::value_type &key_value_pair) {
+ [&relationship](const std::map<utils::Identifier, Relationship>::value_type &key_value_pair) {
return relationship == key_value_pair.second;
});
}
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index 0264833..dc8132f 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -103,15 +103,42 @@ bool Identifier::operator==(const Identifier& other) const {
return data_ == other.data_;
}
-std::string Identifier::to_string() const {
- char uuidStr[37]{}; // 36+1 for the \0
- snprintf(uuidStr, sizeof(uuidStr), UUID_FORMAT_STRING,
- data_[0], data_[1], data_[2], data_[3],
- data_[4], data_[5],
- data_[6], data_[7],
- data_[8], data_[9],
- data_[10], data_[11], data_[12], data_[13], data_[14], data_[15]);
- return {uuidStr};
+bool Identifier::operator<(const Identifier &other) const {
+ return data_ < other.data_;
+}
+
+SmallString<36> Identifier::to_string() const {
+ SmallString<36> uuidStr;
+ // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx is 36 long: 16 bytes * 2 hex digits / byte + 4 hyphens
+ int byteIdx = 0;
+ int charIdx = 0;
+
+ // [xxxxxxxx]-xxxx-xxxx-xxxx-xxxxxxxxxxxx
+ while (byteIdx < 4) {
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+ }
+ // xxxxxxxx[-]xxxx-xxxx-xxxx-xxxxxxxxxxxx
+ uuidStr[charIdx++] = '-';
+
+ // xxxxxxxx-[xxxx-xxxx-xxxx-]xxxxxxxxxxxx - 3x 2 bytes and a hyphen
+ for (int idx = 0; idx < 3; ++idx) {
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+ uuidStr[charIdx++] = '-';
+ }
+
+ // xxxxxxxx-xxxx-xxxx-xxxx-[xxxxxxxxxxxx] - the rest, i.e. until byte 16
+ while (byteIdx < 16) {
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx] >> 4];
+ uuidStr[charIdx++] = hex_lut[data_[byteIdx++] & 0xf];
+ }
+
+ // null terminator
+ uuidStr[charIdx] = 0;
+ return uuidStr;
}
utils::optional<Identifier> Identifier::parse(const std::string &str) {
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index f021013..a53a27f 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -170,7 +170,7 @@ void clear_content_repo(const nifi_instance * instance) {
}
void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) {
- strcpy(uuid_target, proc->getUUIDStr().c_str());
+ strcpy(uuid_target, proc->getUUIDStr().data());
}
void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) {