You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 17:56:01 UTC
nifi-minifi-cpp git commit: MINIFICPP-305: remove Boost from
BuildTests
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master e3085f39d -> 9b9c33543
MINIFICPP-305: remove Boost from BuildTests
MINIFICPP-293: Update repos
MINIFICPP-293: Fix some potential issues with decrement. MINIFICPP-295: remove new/delete char buffer
This closes #186.
Signed-off-by: Bin Qiu <be...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9b9c3354
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9b9c3354
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9b9c3354
Branch: refs/heads/master
Commit: 9b9c3354330d525cfdd50f656ae42fc3da80764c
Parents: e3085f3
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Nov 14 15:17:05 2017 -0500
Committer: Bin Qiu <be...@gmail.com>
Committed: Thu Nov 16 09:54:42 2017 -0800
----------------------------------------------------------------------
cmake/BuildTests.cmake | 4 +-
libminifi/include/core/ContentRepository.h | 7 +-
libminifi/include/core/ProcessSession.h | 90 ++---
.../core/repository/VolatileContentRepository.h | 1 +
libminifi/src/FlowFileRecord.cpp | 22 +-
libminifi/src/core/FlowFile.cpp | 1 +
libminifi/src/core/ProcessSession.cpp | 357 +++----------------
.../repository/VolatileContentRepository.cpp | 20 +-
8 files changed, 102 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 7045c3d..00ba43b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -55,7 +55,9 @@ function(createTests testName)
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/utils")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/processors")
target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/provenance")
- target_include_directories(${testName} BEFORE PRIVATE "${Boost_INCLUDE_DIRS}")
+ if (Boost_FOUND)
+ target_include_directories(${testName} BEFORE PRIVATE "${Boost_INCLUDE_DIRS}")
+ endif()
target_link_libraries(${testName} ${SPD_LIB} ${TEST_BASE_LIB})
target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
if (Boost_FOUND)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/include/core/ContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 1e7ac62..87ac757 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -63,10 +63,13 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> {
remove(streamId);
count_map_.erase(str);
return true;
+ } else {
+ return false;
}
+ } else {
+ remove(streamId);
+ return true;
}
-
- return false;
}
virtual uint32_t getStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 3961511..30d0563 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -74,82 +74,43 @@ class ProcessSession {
// Create a new UUID FlowFile with no content resource claim and without parent
std::shared_ptr<core::FlowFile> create();
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
- std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &&parent);
+ //std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &&parent);
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
- std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent) {
- std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
-
- if (record) {
- _addedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
- }
-
- if (record) {
- // Copy attributes
- std::map<std::string, std::string> parentAttributes = parent->getAttributes();
- std::map<std::string, std::string>::iterator it;
- for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
- if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID))
- // Do not copy special attributes from parent
- continue;
- record->setAttribute(it->first, it->second);
- }
- record->setLineageStartDate(parent->getlineageStartDate());
- record->setLineageIdentifiers(parent->getlineageIdentifiers());
- parent->getlineageIdentifiers().insert(parent->getUUIDStr());
- }
- return record;
- }
+ std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile> &parent);
// Add a FlowFile to the session
- void add(std::shared_ptr<core::FlowFile> &flow);
+ void add(const std::shared_ptr<core::FlowFile> &flow);
// Clone a new UUID FlowFile from parent both for content resource claim and attributes
- std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent);
+ std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent);
// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
- std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size);
+ std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size);
// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
- std::shared_ptr<core::FlowFile> duplicate(std::shared_ptr<core::FlowFile> &original);
+ std::shared_ptr<core::FlowFile> duplicate(const std::shared_ptr<core::FlowFile> &original);
// Transfer the FlowFile to the relationship
- void transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
- void transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship);
+ void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
// Put Attribute
- void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value);
- void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value);
+ void putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value);
// Remove Attribute
- void removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key);
- void removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key);
+ void removeAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key);
// Remove Flow File
- void remove(std::shared_ptr<core::FlowFile> &flow);
- void remove(std::shared_ptr<core::FlowFile> &&flow);
+ void remove(const std::shared_ptr<core::FlowFile> &flow);
// Execute the given read callback against the content
- void read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
- void read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback);
+ void read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
// Execute the given write callback against the content
- void write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
- void write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback);
+ void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
// Execute the given write/append callback against the content
- void append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
- void append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback);
+ void append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
// Penalize the flow
- void penalize(std::shared_ptr<core::FlowFile> &flow);
- void penalize(std::shared_ptr<core::FlowFile> &&flow);
+ void penalize(const std::shared_ptr<core::FlowFile> &flow);
/**
* Imports a file from the data stream
* @param stream incoming data stream that contains the data to store into a file
* @param flow flow file
*/
- void importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow);
+ void importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow);
// import from the data source.
- void import(std::string source, std::shared_ptr<core::FlowFile> &flow,
- bool keepSource = true,
- uint64_t offset = 0);
- void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
- bool keepSource = true,
- uint64_t offset = 0);
- void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows,
- bool keepSource,
- uint64_t offset, char inputDelimiter);
+ void import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0);
+ void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter);
/**
* Exports the data stream to a file
@@ -157,19 +118,16 @@ class ProcessSession {
* @param flow flow file
* @param bool whether or not to keep the content in the flow file
*/
- bool exportContent(const std::string &destination,
- std::shared_ptr<core::FlowFile> &flow,
- bool keepContent);
+ bool exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow,
+ bool keepContent);
- bool exportContent(const std::string &destination,
- const std::string &tmpFileName,
- std::shared_ptr<core::FlowFile> &flow,
- bool keepContent);
+ bool exportContent(const std::string &destination, const std::string &tmpFileName, const std::shared_ptr<core::FlowFile> &flow,
+ bool keepContent);
// Stash the content to a key
- void stash(const std::string &key, std::shared_ptr<core::FlowFile> flow);
- // Restore content previously stashed to a key
- void restore(const std::string &key, std::shared_ptr<core::FlowFile> flow);
+ void stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow);
+ // Restore content previously stashed to a key
+ void restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow);
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/include/core/repository/VolatileContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 623f7be..81e7a29 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -50,6 +50,7 @@ class VolatileContentRepository : public core::ContentRepository, public virtual
max_count_ = 15000;
}
virtual ~VolatileContentRepository() {
+ logger_->log_debug("Clearing repository");
if (!minimize_locking_) {
std::lock_guard<std::mutex> lock(map_mutex_);
for (const auto &item : master_list_) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 4e22322..ec5b6e8 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -98,12 +98,15 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
}
FlowFileRecord::~FlowFileRecord() {
+ logger_->log_debug("Destroying flow file record, UUID %s", uuidStr_.c_str());
if (!snapshot_)
logger_->log_debug("Delete FlowFile UUID %s", uuidStr_.c_str());
else
logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_.c_str());
if (claim_) {
releaseClaim(claim_);
+ } else {
+ logger_->log_debug("Claim is null ptr for %s", uuidStr_);
}
// Disown stash claims
@@ -113,16 +116,17 @@ FlowFileRecord::~FlowFileRecord() {
}
void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
- // Decrease the flow file record owned count for the resource claim
- claim_->decreaseFlowFileRecordOwnedCount();
- std::string value;
- if (claim_->getFlowFileRecordOwnedCount() <= 0) {
- // we cannot rely on the stored variable here since we
- if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
- logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str());
- content_repo_->remove(claim_);
- }
+ // Decrease the flow file record owned count for the resource claim
+ claim_->decreaseFlowFileRecordOwnedCount();
+ std::string value;
+ logger_->log_debug("Delete Resource Claim %s, %s, attempt %d", getUUIDStr(), claim_->getContentFullPath().c_str(), claim_->getFlowFileRecordOwnedCount());
+ if (claim_->getFlowFileRecordOwnedCount() <= 0) {
+ // we cannot rely on the stored variable here since we aren't guaranteed atomicity
+ if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
+ logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str());
+ content_repo_->remove(claim_);
}
+ }
}
bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 3427559..c9d0d83 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -51,6 +51,7 @@ FlowFile::FlowFile()
}
FlowFile::~FlowFile() {
+ logger_->log_debug("Deleteting %s", getUUIDStr());
}
FlowFile& FlowFile::operator=(const FlowFile& other) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 4eb41a3..2d8ee08 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -54,11 +54,11 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
return record;
}
-void ProcessSession::add(std::shared_ptr<core::FlowFile> &record) {
+void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
_addedFlowFiles[record->getUUIDStr()] = record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<core::FlowFile> &parent) {
std::map<std::string, std::string> empty;
std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
@@ -84,7 +84,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::Flo
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core::FlowFile> &parent) {
std::shared_ptr<core::FlowFile> record = this->create(parent);
if (record) {
// Copy Resource Claim
@@ -135,7 +135,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
+std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
std::shared_ptr<core::FlowFile> record = this->create(parent);
if (record) {
if (parent->getResourceClaim()) {
@@ -162,67 +162,39 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::Flow
return record;
}
-void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) {
+void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
flow->setDeleted(true);
+ flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ logger_->log_debug("Auto terminated %s %d %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr());
process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
_deletedFlowFiles[flow->getUUIDStr()] = flow;
std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
provenance_report_->drop(flow, reason);
}
-void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
- flow->setDeleted(true);
- process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
- _deletedFlowFiles[flow->getUUIDStr()] = flow;
- std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
- provenance_report_->drop(flow, reason);
-}
-
-void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
+void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
flow->setAttribute(key, value);
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
provenance_report_->modifyAttributes(flow, details.str());
}
-void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) {
+void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key) {
flow->removeAttribute(key);
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key;
provenance_report_->modifyAttributes(flow, details.str());
}
-void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) {
- flow->setAttribute(key, value);
- std::stringstream details;
- details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
- provenance_report_->modifyAttributes(flow, details.str());
-}
-
-void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) {
- flow->removeAttribute(key);
- std::stringstream details;
- details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key;
- provenance_report_->modifyAttributes(flow, details.str());
-}
-
-void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
- flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec());
-}
-
-void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) {
+void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec());
}
-void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
+void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
_transferRelationship[flow->getUUIDStr()] = relationship;
}
-void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship) {
- _transferRelationship[flow->getUUIDStr()] = relationship;
-}
-
-void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
+void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
try {
@@ -231,10 +203,12 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
// Call the callback to write the content
if (nullptr == stream) {
+ claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
if (callback->process(stream) < 0) {
+ claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
@@ -271,92 +245,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa
}
}
-void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
- try {
- uint64_t startTime = getTimeMillis();
- claim->increaseFlowFileRecordOwnedCount();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
- if (nullptr == stream) {
- rollback();
- return;
- }
- // Call the callback to write the content
- if (callback->process(stream) < 0) {
- rollback();
- return;
- }
- flow->setSize(stream->getSize());
- flow->setOffset(0);
- std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
- if (flow_claim != nullptr) {
- // Remove the old claim
- flow_claim->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- flow->setResourceClaim(claim);
-
- std::stringstream details;
- details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
- } catch (std::exception &exception) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- logger_->log_debug("Caught Exception %s", exception.what());
- throw;
- } catch (...) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- logger_->log_debug("Caught Exception during process session write");
- throw;
- }
-}
-
-void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
- std::shared_ptr<ResourceClaim> claim = nullptr;
-
- if (flow->getResourceClaim() == nullptr) {
- // No existed claim for append, we need to create new claim
- return write(flow, callback);
- }
-
- claim = flow->getResourceClaim();
-
- try {
- uint64_t startTime = getTimeMillis();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
- if (nullptr == stream) {
- rollback();
- return;
- }
- // Call the callback to write the content
- size_t oldPos = stream->getSize();
- stream->seek(oldPos + 1);
- if (callback->process(stream) < 0) {
- rollback();
- return;
- }
- uint64_t appendSize = stream->getSize() - oldPos;
- flow->setSize(flow->getSize() + appendSize);
- std::stringstream details;
- details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
- } catch (std::exception &exception) {
- logger_->log_debug("Caught Exception %s", exception.what());
- throw;
- } catch (...) {
- logger_->log_debug("Caught Exception during process session append");
- throw;
- }
-}
-
-void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
+void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = nullptr;
if (flow->getResourceClaim() == nullptr) {
@@ -396,7 +285,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamC
}
}
-void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
+void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
try {
std::shared_ptr<ResourceClaim> claim = nullptr;
@@ -429,46 +318,13 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCall
}
}
-void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback) {
- try {
- std::shared_ptr<ResourceClaim> claim = nullptr;
-
- if (flow->getResourceClaim() == nullptr) {
- // No existed claim for read, we throw exception
- throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
- }
-
- claim = flow->getResourceClaim();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
-
- if (nullptr == stream) {
- logger_->log_info("claim does not exist");
- rollback();
- return;
- }
- stream->seek(flow->getOffset());
-
- if (callback->process(stream) < 0) {
- logger_->log_info("no data written from stream");
- rollback();
- return;
- }
- } catch (std::exception &exception) {
- logger_->log_debug("Caught Exception %s", exception.what());
- throw;
- } catch (...) {
- logger_->log_debug("Caught Exception during process session read");
- throw;
- }
-}
-
/**
* Imports a file from the data stream
* @param stream incoming data stream that contains the data to store into a file
* @param flow flow file
*
*/
-void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) {
+void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
int max_read = getpagesize();
std::vector<uint8_t> charBuffer;
@@ -481,6 +337,7 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl
if (nullptr == content_stream) {
logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
+ claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
@@ -535,11 +392,11 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl
}
}
-void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
+void ProcessSession::import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
- char *buf = NULL;
- int size = 4096;
- buf = new char[size];
+ int size = getpagesize();
+ std::vector<uint8_t> charBuffer;
+ charBuffer.resize(size);
try {
// std::ofstream fs;
@@ -549,6 +406,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
claim->increaseFlowFileRecordOwnedCount();
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
if (nullptr == stream) {
+ claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
@@ -557,14 +415,14 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
input.seekg(offset);
bool invalidWrite = false;
while (input.good()) {
- input.read(buf, size);
+ input.read(reinterpret_cast<char*>(charBuffer.data()), size);
if (input) {
- if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
+ if (stream->write(charBuffer.data(), size) < 0) {
invalidWrite = true;
break;
}
} else {
- if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+ if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), input.gcount()) < 0) {
invalidWrite = true;
break;
}
@@ -600,15 +458,12 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
} else {
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
-
- delete[] buf;
} catch (std::exception &exception) {
if (flow && flow->getResourceClaim() == claim) {
flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
flow->clearResourceClaim();
}
logger_->log_debug("Caught Exception %s", exception.what());
- delete[] buf;
throw;
} catch (...) {
if (flow && flow->getResourceClaim() == claim) {
@@ -616,19 +471,16 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile>
flow->clearResourceClaim();
}
logger_->log_debug("Caught Exception during process session write");
- delete[] buf;
throw;
}
}
void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) {
std::shared_ptr<ResourceClaim> claim;
-
std::shared_ptr<FlowFileRecord> flowFile;
-
- char *buf = NULL;
- int size = 4096;
- buf = new char[size];
+ int size = getpagesize();
+ std::vector<char> charBuffer;
+ charBuffer.resize(size);
try {
// Open the input file and seek to the appropriate location.
@@ -642,9 +494,9 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
flowFile = std::static_pointer_cast<FlowFileRecord>(create());
claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
uint64_t startTime = getTimeMillis();
- input.getline(buf, size, inputDelimiter);
+ input.getline(charBuffer.data(), size, inputDelimiter);
- size_t bufsize = strlen(buf);
+ size_t bufsize = strlen(charBuffer.data());
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
if (nullptr == stream) {
logger_->log_debug("Stream is null");
@@ -653,12 +505,12 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
}
if (input) {
- if (stream->write(reinterpret_cast<uint8_t*>(buf), bufsize) < 0) {
+ if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), bufsize) < 0) {
invalidWrite = true;
break;
}
} else {
- if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
+ if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), input.gcount()) < 0) {
invalidWrite = true;
break;
}
@@ -696,15 +548,12 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
input.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
-
- delete[] buf;
} catch (std::exception &exception) {
if (flowFile && flowFile->getResourceClaim() == claim) {
flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
flowFile->clearResourceClaim();
}
logger_->log_debug("Caught Exception %s", exception.what());
- delete[] buf;
throw;
} catch (...) {
if (flowFile && flowFile->getResourceClaim() == claim) {
@@ -712,106 +561,12 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
flowFile->clearResourceClaim();
}
logger_->log_debug("Caught Exception during process session write");
- delete[] buf;
throw;
}
}
-void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, bool keepSource, uint64_t offset) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
- char *buf = NULL;
- int size = 4096;
- buf = new char[size];
-
- try {
- // std::ofstream fs;
- uint64_t startTime = getTimeMillis();
- std::ifstream input;
- input.open(source.c_str(), std::fstream::in | std::fstream::binary);
- claim->increaseFlowFileRecordOwnedCount();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
- if (nullptr == stream) {
- rollback();
- return;
- }
- if (input.is_open()) {
- // Open the source file and stream to the flow file
- input.seekg(offset);
- int sizeWritten = 0;
- bool invalidWrite = false;
- while (input.good()) {
- input.read(buf, size);
- if (input) {
- if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) {
- invalidWrite = true;
- break;
- }
- } else {
- if (stream->write(reinterpret_cast<uint8_t*>(buf), input.gcount()) < 0) {
- invalidWrite = true;
- break;
- }
- }
- }
- if (!invalidWrite) {
- flow->setSize(stream->getSize());
- flow->setOffset(0);
- if (flow->getResourceClaim() != nullptr) {
- // Remove the old claim
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- flow->setResourceClaim(claim);
-
- logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
-
- stream->closeStream();
- input.close();
- if (!keepSource)
- std::remove(source.c_str());
- std::stringstream details;
- details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
- } else {
- stream->closeStream();
- input.close();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
- }
- } else {
- throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
- }
-
- delete[] buf;
- } catch (std::exception &exception) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- logger_->log_debug("Caught Exception %s", exception.what());
- delete[] buf;
- throw;
- } catch (...) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
- logger_->log_debug("Caught Exception during process session write");
- delete[] buf;
- throw;
- }
-}
-
-bool ProcessSession::exportContent(
- const std::string &destination,
- const std::string &tmpFile,
- std::shared_ptr<core::FlowFile> &flow,
- bool keepContent) {
- logger_->log_info(
- "Exporting content of %s to %s",
- flow->getUUIDStr().c_str(),
- destination.c_str());
+bool ProcessSession::exportContent(const std::string &destination, const std::string &tmpFile, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) {
+ logger_->log_info("Exporting content of %s to %s", flow->getUUIDStr().c_str(), destination.c_str());
ProcessSessionReadCallback cb(tmpFile, destination, logger_);
read(flow, &cb);
@@ -822,18 +577,12 @@ bool ProcessSession::exportContent(
if (commit_ok) {
logger_->log_info("Commit OK.");
} else {
- logger_->log_error(
- "Commit of %s to %s failed!",
- flow->getUUIDStr().c_str(),
- destination.c_str());
+ logger_->log_error("Commit of %s to %s failed!", flow->getUUIDStr().c_str(), destination.c_str());
}
return commit_ok;
}
-bool ProcessSession::exportContent(
- const std::string &destination,
- std::shared_ptr<core::FlowFile> &flow,
- bool keepContent) {
+bool ProcessSession::exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) {
char tmpFileUuidStr[37];
uuid_t tmpFileUuid;
id_generator_->generate(tmpFileUuid);
@@ -845,16 +594,13 @@ bool ProcessSession::exportContent(
return exportContent(destination, tmpFileName, flow, keepContent);
}
-void ProcessSession::stash(const std::string &key, std::shared_ptr<core::FlowFile> flow) {
- logger_->log_info(
- "Stashing content from %s to key %s",
- flow->getUUIDStr().c_str(), key.c_str());
+void ProcessSession::stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
+ logger_->log_info("Stashing content from %s to key %s", flow->getUUIDStr().c_str(), key.c_str());
if (!flow->getResourceClaim()) {
- logger_->log_warn(
- "Attempted to stash content of record %s when "
- "there is no resource claim",
- flow->getUUIDStr().c_str());
+ logger_->log_warn("Attempted to stash content of record %s when "
+ "there is no resource claim",
+ flow->getUUIDStr().c_str());
return;
}
@@ -866,25 +612,20 @@ void ProcessSession::stash(const std::string &key, std::shared_ptr<core::FlowFil
flow->clearResourceClaim();
}
-void ProcessSession::restore(const std::string &key, std::shared_ptr<core::FlowFile> flow) {
- logger_->log_info(
- "Restoring content to %s from key %s",
- flow->getUUIDStr().c_str(), key.c_str());
+void ProcessSession::restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
+ logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr().c_str(), key.c_str());
// Restore the claim
if (!flow->hasStashClaim(key)) {
- logger_->log_warn(
- "Requested restore to record %s from unknown key %s",
- flow->getUUIDStr().c_str(), key.c_str());
+ logger_->log_warn("Requested restore to record %s from unknown key %s", flow->getUUIDStr().c_str(), key.c_str());
return;
}
// Disown current claim if existing
if (flow->getResourceClaim()) {
- logger_->log_warn(
- "Restoring stashed content of record %s from key %s when there is "
- "existing content; existing content will be overwritten",
- flow->getUUIDStr().c_str(), key.c_str());
+ logger_->log_warn("Restoring stashed content of record %s from key %s when there is "
+ "existing content; existing content will be overwritten",
+ flow->getUUIDStr().c_str(), key.c_str());
flow->releaseClaim(flow->getResourceClaim());
}
@@ -958,7 +699,7 @@ void ProcessSession::commit() {
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
} else {
logger_->log_debug("added flow file is auto terminated");
- // Autoterminated
+ // Auto-terminated
remove(record);
}
} else {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9b9c3354/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index d3e696b..77eb1b2 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -95,7 +95,6 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar
logger_->log_debug("Creating copy of atomic entry");
auto ent = claim_check->second->takeOwnership();
if (ent == nullptr) {
- logger_->log_debug("write returns nullptr");
return nullptr;
}
return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
@@ -104,11 +103,11 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar
int size = 0;
if (__builtin_expect(minimize_locking_ == true, 1)) {
- logger_->log_debug("Minimize locking");
for (auto ent : value_vector_) {
if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
std::lock_guard<std::mutex> lock(map_mutex_);
master_list_[claim->getContentFullPath()] = ent;
+ logger_->log_debug("Minimize locking, return stream for %s", claim->getContentFullPath());
return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
}
size++;
@@ -117,10 +116,8 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar
std::lock_guard < std::mutex > lock(map_mutex_);
auto claim_check = master_list_.find(claim->getContentFullPath());
if (claim_check != master_list_.end()) {
- logger_->log_debug("Creating copy of atomic entry");
return std::make_shared < io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second);
} else {
- logger_->log_debug("Creating new atomic entry");
AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(¤t_size_, &max_size_);
if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
master_list_[claim->getContentFullPath()] = ent;
@@ -128,12 +125,11 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar
}
}
}
- logger_->log_debug("write returns nullptr %d", size);
+ logger_->log_debug("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim->getContentFullPath(), size);
return nullptr;
}
bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) {
- logger_->log_debug("enter exists for %s", claim->getContentFullPath());
int size = 0;
{
std::lock_guard<std::mutex> lock(map_mutex_);
@@ -150,7 +146,6 @@ bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceCla
}
std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
- logger_->log_debug("enter read for %s", claim->getContentFullPath());
int size = 0;
{
std::lock_guard<std::mutex> lock(map_mutex_);
@@ -163,21 +158,19 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::share
return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
}
}
- logger_->log_debug("enter read for %s after %d", claim->getContentFullPath(), size);
return nullptr;
}
bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
- logger_->log_debug("enter remove for %s, reducing %d", claim->getContentFullPath(), current_size_.load());
if (__builtin_expect(minimize_locking_ == true, 1)) {
std::lock_guard<std::mutex> lock(map_mutex_);
auto ent = master_list_.find(claim->getContentFullPath());
if (ent != master_list_.end()) {
+ auto ptr = ent->second;
// if we cannot remove the entry we will let the owner's destructor
// decrement the reference count and free it
master_list_.erase(claim->getContentFullPath());
- if (ent->second->freeValue(claim)) {
- logger_->log_debug("removed %s", claim->getContentFullPath());
+ if (ptr->freeValue(claim)) {
logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load());
return true;
} else {
@@ -192,12 +185,11 @@ bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceCla
delete master_list_[claim->getContentFullPath()];
master_list_.erase(claim->getContentFullPath());
current_size_ -= size;
- logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load());
+
return true;
}
- logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load());
- logger_->log_debug("could not remove %s", claim->getContentFullPath());
+ logger_->log_debug("Could not remove %s, may not exist", claim->getContentFullPath());
return false;
}