You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:08 UTC
[5/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with
increased line length to source
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index c33800d..9e6778c 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -37,8 +37,7 @@ namespace nifi {
namespace minifi {
namespace core {
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
- ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
@@ -60,8 +59,7 @@ ProcessGroup::~ProcessGroup() {
connection->drain();
}
- for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
- it != child_process_groups_.end(); ++it) {
+ for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
ProcessGroup *processGroup(*it);
delete processGroup;
}
@@ -78,8 +76,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
if (processors_.find(processor) == processors_.end()) {
// We do not have the same processor in this process group yet
processors_.insert(processor);
- logger_->log_info("Add processor %s into process group %s",
- processor->getName().c_str(), name_.c_str());
+ logger_->log_info("Add processor %s into process group %s", processor->getName().c_str(), name_.c_str());
}
}
@@ -89,8 +86,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
if (processors_.find(processor) != processors_.end()) {
// We do have the same processor in this process group yet
processors_.erase(processor);
- logger_->log_info("Remove processor %s from process group %s",
- processor->getName().c_str(), name_.c_str());
+ logger_->log_info("Remove processor %s from process group %s", processor->getName().c_str(), name_.c_str());
}
}
@@ -100,8 +96,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
child_process_groups_.insert(child);
- logger_->log_info("Add child process group %s into process group %s",
- child->getName().c_str(), name_.c_str());
+ logger_->log_info("Add child process group %s into process group %s", child->getName().c_str(), name_.c_str());
}
}
@@ -111,13 +106,11 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
if (child_process_groups_.find(child) != child_process_groups_.end()) {
// We do have the same child process group in this process group yet
child_process_groups_.erase(child);
- logger_->log_info("Remove child process group %s from process group %s",
- child->getName().c_str(), name_.c_str());
+ logger_->log_info("Remove child process group %s from process group %s", child->getName().c_str(), name_.c_str());
}
}
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
- EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
@@ -125,8 +118,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
for (auto processor : processors_) {
logger_->log_debug("Starting %s", processor->getName().c_str());
- if (!processor->isRunning()
- && processor->getScheduledState() != DISABLED) {
+ if (!processor->isRunning() && processor->getScheduledState() != DISABLED) {
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->schedule(processor);
else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
@@ -141,20 +133,17 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
- logger_->log_debug(
- "Caught Exception during process group start processing");
+ logger_->log_debug("Caught Exception during process group start processing");
throw;
}
}
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
- EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Stop all the processor node, input and output ports
- for (std::set<std::shared_ptr<Processor> >::iterator it =
- processors_.begin(); it != processors_.end(); ++it) {
+ for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) {
std::shared_ptr<Processor> processor(*it);
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->unschedule(processor);
@@ -162,8 +151,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
eventScheduler->unschedule(processor);
}
- for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
- it != child_process_groups_.end(); ++it) {
+ for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
ProcessGroup *processGroup(*it);
processGroup->stopProcessing(timeScheduler, eventScheduler);
}
@@ -195,8 +183,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
}
}
for (auto processGroup : child_process_groups_) {
- logger_->log_info("find processor child %s",
- processGroup->getName().c_str());
+ logger_->log_info("find processor child %s", processGroup->getName().c_str());
std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
if (processor)
return processor;
@@ -204,9 +191,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
return ret;
}
-void ProcessGroup::addControllerService(
- const std::string &nodeId,
- std::shared_ptr<core::controller::ControllerServiceNode> &node) {
+void ProcessGroup::addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node) {
controller_service_map_.put(nodeId, node);
}
@@ -215,13 +200,11 @@ void ProcessGroup::addControllerService(
* @param node node identifier
* @return controller service node, if it exists.
*/
-std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(
- const std::string &nodeId) {
+std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(const std::string &nodeId) {
return controller_service_map_.getControllerServiceNode(nodeId);
}
-std::shared_ptr<Processor> ProcessGroup::findProcessor(
- const std::string &processorName) {
+std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
@@ -230,17 +213,14 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(
return processor;
}
for (auto processGroup : child_process_groups_) {
- std::shared_ptr<Processor> processor = processGroup->findProcessor(
- processorName);
+ std::shared_ptr<Processor> processor = processGroup->findProcessor(processorName);
if (processor)
return processor;
}
return ret;
}
-void ProcessGroup::updatePropertyValue(std::string processorName,
- std::string propertyName,
- std::string propertyValue) {
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto processor : processors_) {
if (processor->getName() == processorName) {
@@ -248,14 +228,12 @@ void ProcessGroup::updatePropertyValue(std::string processorName,
}
}
for (auto processGroup : child_process_groups_) {
- processGroup->updatePropertyValue(processorName, propertyName,
- propertyValue);
+ processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
return;
}
-void ProcessGroup::getConnections(
- std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
+void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
for (auto connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
}
@@ -270,8 +248,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
connections_.insert(connection);
- logger_->log_info("Add connection %s into process group %s",
- connection->getName().c_str(), name_.c_str());
+ logger_->log_info("Add connection %s into process group %s", connection->getName().c_str(), name_.c_str());
uuid_t sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
@@ -293,8 +270,7 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet
connections_.erase(connection);
- logger_->log_info("Remove connection %s into process group %s",
- connection->getName().c_str(), name_.c_str());
+ logger_->log_info("Remove connection %s into process group %s", connection->getName().c_str(), name_.c_str());
uuid_t sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 70de3f6..037660f 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -38,40 +38,31 @@ namespace core {
std::shared_ptr<core::FlowFile> ProcessSession::create() {
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
- process_context_->getProvenanceRepository(), empty);
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
_addedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Create FlowFile with UUID %s",
- record->getUUIDStr().c_str());
- std::string details = process_context_->getProcessorNode().getName()
- + " creates flow record " + record->getUUIDStr();
+ logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+ std::string details = process_context_->getProcessorNode().getName() + " creates flow record " + record->getUUIDStr();
provenance_report_->create(record, details);
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::create(
- std::shared_ptr<core::FlowFile> &&parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
- process_context_->getProvenanceRepository(), empty);
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
if (record) {
_addedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Create FlowFile with UUID %s",
- record->getUUIDStr().c_str());
+ logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
}
if (record) {
// Copy attributes
- std::map<std::string, std::string> parentAttributes =
- parent->getAttributes();
+ std::map<std::string, std::string> parentAttributes = parent->getAttributes();
std::map<std::string, std::string>::iterator it;
for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
- if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
- || it->first == FlowAttributeKey(DISCARD_REASON)
- || it->first == FlowAttributeKey(UUID))
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID))
// Do not copy special attributes from parent
continue;
record->setAttribute(it->first, it->second);
@@ -83,8 +74,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::clone(
- std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent) {
std::shared_ptr<core::FlowFile> record = this->create(parent);
if (record) {
// Copy Resource Claim
@@ -100,24 +90,18 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
- std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) {
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
- process_context_->getProvenanceRepository(), empty);
+ std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
if (record) {
this->_clonedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Clone FlowFile with UUID %s during transfer",
- record->getUUIDStr().c_str());
+ logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
// Copy attributes
- std::map<std::string, std::string> parentAttributes =
- parent->getAttributes();
+ std::map<std::string, std::string> parentAttributes = parent->getAttributes();
std::map<std::string, std::string>::iterator it;
for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
- if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
- || it->first == FlowAttributeKey(DISCARD_REASON)
- || it->first == FlowAttributeKey(UUID))
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID))
// Do not copy special attributes from parent
continue;
record->setAttribute(it->first, it->second);
@@ -141,18 +125,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
return record;
}
-std::shared_ptr<core::FlowFile> ProcessSession::clone(
- std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
+std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
std::shared_ptr<core::FlowFile> record = this->create(parent);
if (record) {
if (parent->getResourceClaim()) {
if ((offset + size) > parent->getSize()) {
// Set offset and size
- logger_->log_error("clone offset %d and size %d exceed parent size %d",
- offset, size, parent->getSize());
+ logger_->log_error("clone offset %d and size %d exceed parent size %d", offset, size, parent->getSize());
// Remove the Add FlowFile for the session
- std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it =
- this->_addedFlowFiles.find(record->getUUIDStr());
+ std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it = this->_addedFlowFiles.find(record->getUUIDStr());
if (it != this->_addedFlowFiles.end())
this->_addedFlowFiles.erase(record->getUUIDStr());
return nullptr;
@@ -174,85 +155,65 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) {
flow->setDeleted(true);
_deletedFlowFiles[flow->getUUIDStr()] = flow;
- std::string reason = process_context_->getProcessorNode().getName()
- + " drop flow record " + flow->getUUIDStr();
+ std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr();
provenance_report_->drop(flow, reason);
}
void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
flow->setDeleted(true);
_deletedFlowFiles[flow->getUUIDStr()] = flow;
- std::string reason = process_context_->getProcessorNode().getName()
- + " drop flow record " + flow->getUUIDStr();
+ std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr();
provenance_report_->drop(flow, reason);
}
-void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow,
- std::string key, std::string value) {
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
flow->setAttribute(key, value);
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
- + value;
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
provenance_report_->modifyAttributes(flow, details);
}
-void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow,
- std::string key) {
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) {
flow->removeAttribute(key);
- std::string details = process_context_->getProcessorNode().getName()
- + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+ std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
provenance_report_->modifyAttributes(flow, details);
}
-void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow,
- std::string key, std::string value) {
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) {
flow->setAttribute(key, value);
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
- + value;
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
provenance_report_->modifyAttributes(flow, details);
}
-void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow,
- std::string key) {
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) {
flow->removeAttribute(key);
- std::string details = process_context_->getProcessorNode().getName()
- + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+ std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
provenance_report_->modifyAttributes(flow, details);
}
void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
- flow->setPenaltyExpiration(
- getTimeMillis()
- + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+ flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec());
}
void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) {
- flow->setPenaltyExpiration(
- getTimeMillis()
- + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+ flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec());
}
-void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow,
- Relationship relationship) {
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
_transferRelationship[flow->getUUIDStr()] = relationship;
}
-void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow,
- Relationship relationship) {
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship) {
_transferRelationship[flow->getUUIDStr()] = relationship;
}
-void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
- OutputStreamCallback *callback) {
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(
DEFAULT_CONTENT_DIRECTORY);
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
if (fs.is_open()) {
// Call the callback to write the content
callback->process(&fs);
@@ -271,8 +232,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -299,14 +259,12 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
}
}
-void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
- OutputStreamCallback *callback) {
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
if (fs.is_open()) {
// Call the callback to write the content
callback->process(&fs);
@@ -325,8 +283,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -353,8 +310,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
}
}
-void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
- OutputStreamCallback *callback) {
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = nullptr;
if (flow->getResourceClaim() == nullptr) {
@@ -367,8 +323,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::app);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
if (fs.is_open()) {
// Call the callback to write the content
std::streampos oldPos = fs.tellp();
@@ -380,8 +335,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -400,8 +354,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
}
}
-void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
- OutputStreamCallback *callback) {
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = nullptr;
if (flow->getResourceClaim() == nullptr) {
@@ -414,8 +367,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::app);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
if (fs.is_open()) {
// Call the callback to write the content
std::streampos oldPos = fs.tellp();
@@ -427,8 +379,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -447,21 +398,18 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
}
}
-void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow,
- InputStreamCallback *callback) {
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
try {
std::shared_ptr<ResourceClaim> claim = nullptr;
if (flow->getResourceClaim() == nullptr) {
// No existed claim for read, we throw exception
- throw Exception(FILE_OPERATION_EXCEPTION,
- "No Content Claim existed for read");
+ throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
}
claim = flow->getResourceClaim();
std::ifstream fs;
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::in | std::fstream::binary);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
if (fs.is_open()) {
fs.seekg(flow->getOffset(), fs.beg);
@@ -487,21 +435,18 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow,
}
}
-void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
- InputStreamCallback *callback) {
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback) {
try {
std::shared_ptr<ResourceClaim> claim = nullptr;
if (flow->getResourceClaim() == nullptr) {
// No existed claim for read, we throw exception
- throw Exception(FILE_OPERATION_EXCEPTION,
- "No Content Claim existed for read");
+ throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
}
claim = flow->getResourceClaim();
std::ifstream fs;
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::in | std::fstream::binary);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
if (fs.is_open()) {
fs.seekg(flow->getOffset(), fs.beg);
@@ -533,8 +478,7 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
* @param flow flow file
*
*/
-void ProcessSession::importFrom(io::DataStream &stream,
- std::shared_ptr<core::FlowFile> &&flow) {
+void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
int max_read = getpagesize();
@@ -544,8 +488,7 @@ void ProcessSession::importFrom(io::DataStream &stream,
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
if (fs.is_open()) {
size_t position = 0;
@@ -576,15 +519,11 @@ void ProcessSession::importFrom(io::DataStream &stream,
flow->setResourceClaim(claim);
claim->increaseFlowFileRecordOwnedCount();
- logger_->log_debug(
- "Import offset %d length %d into content %s for FlowFile UUID %s",
- flow->getOffset(), flow->getSize(),
- flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
+ logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+ flow->getUUIDStr().c_str());
fs.close();
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -611,9 +550,9 @@ void ProcessSession::importFrom(io::DataStream &stream,
}
}
-void ProcessSession::import(std::string source,
- std::shared_ptr<core::FlowFile> &flow,
- bool keepSource, uint64_t offset) {
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow,
+bool keepSource,
+ uint64_t offset) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
char *buf = NULL;
int size = 4096;
@@ -622,8 +561,7 @@ void ProcessSession::import(std::string source,
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
@@ -649,18 +587,14 @@ void ProcessSession::import(std::string source,
flow->setResourceClaim(claim);
claim->increaseFlowFileRecordOwnedCount();
- logger_->log_debug(
- "Import offset %d length %d into content %s for FlowFile UUID %s",
- flow->getOffset(), flow->getSize(),
- flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
+ logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+ flow->getUUIDStr().c_str());
fs.close();
input.close();
if (!keepSource)
std::remove(source.c_str());
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -692,9 +626,9 @@ void ProcessSession::import(std::string source,
}
}
-void ProcessSession::import(std::string source,
- std::shared_ptr<core::FlowFile> &&flow,
- bool keepSource, uint64_t offset) {
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
+bool keepSource,
+ uint64_t offset) {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
char *buf = NULL;
@@ -704,8 +638,7 @@ void ProcessSession::import(std::string source,
try {
std::ofstream fs;
uint64_t startTime = getTimeMillis();
- fs.open(claim->getContentFullPath().c_str(),
- std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
@@ -731,18 +664,14 @@ void ProcessSession::import(std::string source,
flow->setResourceClaim(claim);
claim->increaseFlowFileRecordOwnedCount();
- logger_->log_debug(
- "Import offset %d length %d into content %s for FlowFile UUID %s",
- flow->getOffset(), flow->getSize(),
- flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
+ logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+ flow->getUUIDStr().c_str());
fs.close();
input.close();
if (!keepSource)
std::remove(source.c_str());
- std::string details = process_context_->getProcessorNode().getName()
- + " modify flow record content " + flow->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details, endTime - startTime);
} else {
@@ -781,21 +710,16 @@ void ProcessSession::commit() {
std::shared_ptr<core::FlowFile> record = it.second;
if (record->isDeleted())
continue;
- std::map<std::string, Relationship>::iterator itRelationship = this
- ->_transferRelationship.find(record->getUUIDStr());
+ std::map<std::string, Relationship>::iterator itRelationship = this->_transferRelationship.find(record->getUUIDStr());
if (itRelationship != _transferRelationship.end()) {
Relationship relationship = itRelationship->second;
// Find the relationship, we need to find the connections for that relationship
- std::set<std::shared_ptr<Connectable>> connections = process_context_
- ->getProcessorNode().getOutGoingConnections(relationship.getName());
+ std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName());
if (connections.empty()) {
// No connection
- if (!process_context_->getProcessorNode().isAutoTerminated(
- relationship)) {
+ if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) {
// Not autoterminate, we should have the connect
- std::string message =
- "Connect empty for non auto terminated relationship"
- + relationship.getName();
+ std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
} else {
// Autoterminated
@@ -803,9 +727,7 @@ void ProcessSession::commit() {
}
} else {
// We connections, clone the flow and assign the connection accordingly
- for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
- connections.begin(); itConnection != connections.end();
- ++itConnection) {
+ for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
std::shared_ptr<Connectable> connection = *itConnection;
if (itConnection == connections.begin()) {
// First connection which the flow need be routed to
@@ -817,15 +739,13 @@ void ProcessSession::commit() {
if (cloneRecord)
cloneRecord->setConnection(connection);
else
- throw Exception(PROCESS_SESSION_EXCEPTION,
- "Can not clone the flow for transfer");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
}
}
}
} else {
// Can not find relationship for the flow
- throw Exception(PROCESS_SESSION_EXCEPTION,
- "Can not find the transfer relationship for the flow");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
}
}
@@ -834,21 +754,16 @@ void ProcessSession::commit() {
std::shared_ptr<core::FlowFile> record = it.second;
if (record->isDeleted())
continue;
- std::map<std::string, Relationship>::iterator itRelationship = this
- ->_transferRelationship.find(record->getUUIDStr());
+ std::map<std::string, Relationship>::iterator itRelationship = this->_transferRelationship.find(record->getUUIDStr());
if (itRelationship != _transferRelationship.end()) {
Relationship relationship = itRelationship->second;
// Find the relationship, we need to find the connections for that relationship
- std::set<std::shared_ptr<Connectable>> connections = process_context_
- ->getProcessorNode().getOutGoingConnections(relationship.getName());
+ std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName());
if (connections.empty()) {
// No connection
- if (!process_context_->getProcessorNode().isAutoTerminated(
- relationship)) {
+ if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) {
// Not autoterminate, we should have the connect
- std::string message =
- "Connect empty for non auto terminated relationship "
- + relationship.getName();
+ std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
} else {
// Autoterminated
@@ -856,9 +771,7 @@ void ProcessSession::commit() {
}
} else {
// We connections, clone the flow and assign the connection accordingly
- for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
- connections.begin(); itConnection != connections.end();
- ++itConnection) {
+ for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
std::shared_ptr<Connectable> connection(*itConnection);
if (itConnection == connections.begin()) {
// First connection which the flow need be routed to
@@ -870,15 +783,13 @@ void ProcessSession::commit() {
if (cloneRecord)
cloneRecord->setConnection(connection);
else
- throw Exception(PROCESS_SESSION_EXCEPTION,
- "Can not clone the flow for transfer");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
}
}
}
} else {
// Can not find relationship for the flow
- throw Exception(PROCESS_SESSION_EXCEPTION,
- "Can not find the transfer relationship for the flow");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
}
}
@@ -890,8 +801,7 @@ void ProcessSession::commit() {
continue;
}
- connection = std::static_pointer_cast<Connection>(
- record->getConnection());
+ connection = std::static_pointer_cast<Connection>(record->getConnection());
if ((connection) != nullptr)
connection->put(record);
}
@@ -900,8 +810,7 @@ void ProcessSession::commit() {
if (record->isDeleted()) {
continue;
}
- connection = std::static_pointer_cast<Connection>(
- record->getConnection());
+ connection = std::static_pointer_cast<Connection>(record->getConnection());
if ((connection) != nullptr)
connection->put(record);
}
@@ -911,8 +820,7 @@ void ProcessSession::commit() {
if (record->isDeleted()) {
continue;
}
- connection = std::static_pointer_cast<Connection>(
- record->getConnection());
+ connection = std::static_pointer_cast<Connection>(record->getConnection());
if ((connection) != nullptr)
connection->put(record);
}
@@ -925,8 +833,7 @@ void ProcessSession::commit() {
_originalFlowFiles.clear();
// persistent the provenance report
this->provenance_report_->commit();
- logger_->log_trace("ProcessSession committed for %s",
- process_context_->getProcessorNode().getName().c_str());
+ logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode().getName().c_str());
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -942,11 +849,9 @@ void ProcessSession::rollback() {
// Requeue the snapshot of the flowfile back
for (const auto &it : _originalFlowFiles) {
std::shared_ptr<core::FlowFile> record = it.second;
- connection = std::static_pointer_cast<Connection>(
- record->getOriginalConnection());
+ connection = std::static_pointer_cast<Connection>(record->getOriginalConnection());
if ((connection) != nullptr) {
- std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<
- FlowFileRecord>(record);
+ std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record);
flowf->setSnapShot(false);
connection->put(record);
}
@@ -957,8 +862,7 @@ void ProcessSession::rollback() {
_addedFlowFiles.clear();
_updatedFlowFiles.clear();
_deletedFlowFiles.clear();
- logger_->log_trace("ProcessSession rollback for %s",
- process_context_->getProcessorNode().getName().c_str());
+ logger_->log_trace("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -969,25 +873,21 @@ void ProcessSession::rollback() {
}
std::shared_ptr<core::FlowFile> ProcessSession::get() {
- std::shared_ptr<Connectable> first = process_context_->getProcessorNode()
- .getNextIncomingConnection();
+ std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection();
if (first == NULL)
return NULL;
- std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(
- first);
+ std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(first);
do {
std::set<std::shared_ptr<core::FlowFile> > expired;
std::shared_ptr<core::FlowFile> ret = current->poll(expired);
if (expired.size() > 0) {
// Remove expired flow record
- for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired
- .begin(); it != expired.end(); ++it) {
+ for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) {
std::shared_ptr<core::FlowFile> record = *it;
- std::string details = process_context_->getProcessorNode().getName()
- + " expire flow record " + record->getUUIDStr();
+ std::string details = process_context_->getProcessorNode().getName() + " expire flow record " + record->getUUIDStr();
provenance_report_->expire(record, details);
}
}
@@ -996,19 +896,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
ret->setDeleted(false);
_updatedFlowFiles[ret->getUUIDStr()] = ret;
std::map<std::string, std::string> empty;
- std::shared_ptr<core::FlowFile> snapshot =
- std::make_shared<FlowFileRecord>(
- process_context_->getProvenanceRepository(), empty);
- logger_->log_debug("Create Snapshot FlowFile with UUID %s",
- snapshot->getUUIDStr().c_str());
+ std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+ logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
snapshot = ret;
// snapshot->duplicate(ret);
// save a snapshot
_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
return ret;
}
- current = std::static_pointer_cast<Connection>(
- process_context_->getProcessorNode().getNextIncomingConnection());
+ current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode().getNextIncomingConnection());
} while (current != NULL && current != first);
return NULL;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index dbeb46f..7b07638 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -45,8 +45,9 @@ namespace minifi {
namespace core {
Processor::Processor(std::string name, uuid_t uuid)
- : Connectable(name, uuid), ConfigurableComponent(),
- logger_(logging::LoggerFactory<Processor>::getLogger()) {
+ : Connectable(name, uuid),
+ ConfigurableComponent(),
+ logger_(logging::LoggerFactory<Processor>::getLogger()) {
has_work_.store(false);
// Setup the default values
state_ = DISABLED;
@@ -61,8 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
active_tasks_ = 0;
yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
- logger_->log_info("Processor %s created UUID %s", name_.c_str(),
- uuidStr_.c_str());
+ logger_->log_info("Processor %s created UUID %s", name_.c_str(), uuidStr_.c_str());
}
bool Processor::isRunning() {
@@ -77,12 +77,10 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
bool ret = false;
if (isRunning()) {
- logger_->log_info("Can not add connection while the process %s is running",
- name_.c_str());
+ logger_->log_info("Can not add connection while the process %s is running", name_.c_str());
return false;
}
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
- conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
std::lock_guard<std::mutex> lock(mutex_);
uuid_t srcUUID;
@@ -101,9 +99,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
if (_incomingConnections.find(connection) == _incomingConnections.end()) {
_incomingConnections.insert(connection);
connection->setDestination(shared_from_this());
- logger_->log_info(
- "Add connection %s into Processor %s incoming connection",
- connection->getName().c_str(), name_.c_str());
+ logger_->log_info("Add connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str());
incoming_connections_Iter = this->_incomingConnections.begin();
ret = true;
}
@@ -122,9 +118,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
existedConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = existedConnection;
- logger_->log_info(
- "Add connection %s into Processor %s outgoing connection for relationship %s",
- connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
ret = true;
}
} else {
@@ -133,9 +127,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
newConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = newConnection;
- logger_->log_info(
- "Add connection %s into Processor %s outgoing connection for relationship %s",
- connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
ret = true;
}
}
@@ -145,9 +137,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (isRunning()) {
- logger_->log_info(
- "Can not remove connection while the process %s is running",
- name_.c_str());
+ logger_->log_info("Can not remove connection while the process %s is running", name_.c_str());
return;
}
@@ -156,8 +146,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
uuid_t srcUUID;
uuid_t destUUID;
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
- conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
connection->getSourceUUID(srcUUID);
connection->getDestinationUUID(destUUID);
@@ -167,9 +156,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (_incomingConnections.find(connection) != _incomingConnections.end()) {
_incomingConnections.erase(connection);
connection->setDestination(NULL);
- logger_->log_info(
- "Remove connection %s into Processor %s incoming connection",
- connection->getName().c_str(), name_.c_str());
+ logger_->log_info("Remove connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str());
incoming_connections_Iter = this->_incomingConnections.begin();
}
}
@@ -181,13 +168,10 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (it == out_going_connections_.end()) {
return;
} else {
- if (out_going_connections_[relationship].find(connection)
- != out_going_connections_[relationship].end()) {
+ if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) {
out_going_connections_[relationship].erase(connection);
connection->setSource(NULL);
- logger_->log_info(
- "Remove connection %s into Processor %s outgoing connection for relationship %s",
- connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ logger_->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
}
}
}
@@ -200,8 +184,7 @@ bool Processor::flowFilesQueued() {
return false;
for (auto &&conn : _incomingConnections) {
- std::shared_ptr<Connection> connection =
- std::static_pointer_cast<Connection>(conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
if (connection->getQueueSize() > 0)
return true;
}
@@ -216,8 +199,7 @@ bool Processor::flowFilesOutGoingFull() {
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
for (const auto conn : existedConnection) {
- std::shared_ptr<Connection> connection = std::static_pointer_cast<
- Connection>(conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
if (connection->isFull())
return true;
}
@@ -226,8 +208,7 @@ bool Processor::flowFilesOutGoingFull() {
return false;
}
-void Processor::onTrigger(ProcessContext *context,
- ProcessSessionFactory *sessionFactory) {
+void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
auto session = sessionFactory->createSession();
try {
@@ -251,17 +232,15 @@ bool Processor::isWorkAvailable() {
try {
for (const auto &conn : _incomingConnections) {
- std::shared_ptr<Connection> connection = std::static_pointer_cast<
- Connection>(conn);
+ std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
if (connection->getQueueSize() > 0) {
hasWork = true;
break;
}
}
} catch (...) {
- logger_->log_error(
- "Caught an exception while checking if work is available;"
- " unless it was positively determined that work is available, assuming NO work is available!");
+ logger_->log_error("Caught an exception while checking if work is available;"
+ " unless it was positively determined that work is available, assuming NO work is available!");
}
return hasWork;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
index bf39738..05f31a0 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -25,7 +25,8 @@ namespace core {
ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor)
: processor_(processor),
- Connectable(processor->getName(), 0), ConfigurableComponent() {
+ Connectable(processor->getName(), 0),
+ ConfigurableComponent() {
uuid_t copy;
processor->getUUID(copy);
setUUID(copy);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index 45ad980..cf18601 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -41,12 +41,10 @@ namespace core {
class FlowFileRepository;
#endif
-std::shared_ptr<core::Repository> createRepository(
- const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
+std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
std::shared_ptr<core::Repository> return_obj = nullptr;
std::string class_name_lc = configuration_class_name;
- std::transform(class_name_lc.begin(), class_name_lc.end(),
- class_name_lc.begin(), ::tolower);
+ std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
try {
std::shared_ptr<core::Repository> return_obj = nullptr;
if (class_name_lc == "flowfilerepository") {
@@ -65,21 +63,17 @@ std::shared_ptr<core::Repository> createRepository(
return return_obj;
}
if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
- 1);
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
} else {
- throw std::runtime_error(
- "Support for the provided configuration class could not be found");
+ throw std::runtime_error("Support for the provided configuration class could not be found");
}
} catch (const std::runtime_error &r) {
if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
- 1);
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
}
}
- throw std::runtime_error(
- "Support for the provided configuration class could not be found");
+ throw std::runtime_error("Support for the provided configuration class could not be found");
}
} /* namespace core */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/ControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/ControllerServiceNode.cpp b/libminifi/src/core/controller/ControllerServiceNode.cpp
index 12e3653..3097574 100644
--- a/libminifi/src/core/controller/ControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/ControllerServiceNode.cpp
@@ -39,8 +39,6 @@ std::vector<std::shared_ptr<ConfigurableComponent> > &ControllerServiceNode::get
return linked_components_;
}
-
-
} /* namespace controller */
} /* namespace core */
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/ControllerServiceProvider.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/ControllerServiceProvider.cpp b/libminifi/src/core/controller/ControllerServiceProvider.cpp
index da5c6a1..942c299 100644
--- a/libminifi/src/core/controller/ControllerServiceProvider.cpp
+++ b/libminifi/src/core/controller/ControllerServiceProvider.cpp
@@ -32,8 +32,7 @@ namespace controller {
* @return the ControllerService that is registered with the given
* identifier
*/
-std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService(
- const std::string &identifier) {
+std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService(const std::string &identifier) {
auto service = controller_map_->getControllerServiceNode(identifier);
if (service != nullptr) {
return service->getControllerServiceImplementation();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 26804f6..5c4aa70 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -31,8 +31,7 @@ std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGr
return process_group_;
}
-void StandardControllerServiceNode::setProcessGroup(
- std::shared_ptr<ProcessGroup> &processGroup) {
+void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) {
std::lock_guard<std::mutex> lock(mutex_);
process_group_ = processGroup;
}
@@ -44,8 +43,7 @@ bool StandardControllerServiceNode::enable() {
if (getProperty(property.getName(), property)) {
active = true;
for (auto linked_service : property.getValues()) {
- std::shared_ptr<ControllerServiceNode> csNode = provider
- ->getControllerServiceNode(linked_service);
+ std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service);
if (nullptr != csNode) {
std::lock_guard<std::mutex> lock(mutex_);
linked_controller_services_.push_back(csNode);
@@ -53,8 +51,7 @@ bool StandardControllerServiceNode::enable() {
}
} else {
}
- std::shared_ptr<ControllerService> impl =
- getControllerServiceImplementation();
+ std::shared_ptr<ControllerService> impl = getControllerServiceImplementation();
if (nullptr != impl) {
impl->onEnable();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 4bb6615..89e6a1b 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -42,25 +42,26 @@ namespace logging {
const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v";
-std::vector< std::string > LoggerProperties::get_keys_of_type(const std::string &type) {
+std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) {
std::vector<std::string> appenders;
std::string prefix = type + ".";
for (auto const & entry : properties_) {
- if (entry.first.rfind(prefix, 0) == 0 &&
- entry.first.find(".", prefix.length() + 1) == std::string::npos) {
+ if (entry.first.rfind(prefix, 0) == 0 && entry.first.find(".", prefix.length() + 1) == std::string::npos) {
appenders.push_back(entry.first);
}
}
return appenders;
}
-LoggerConfiguration::LoggerConfiguration() : root_namespace_(create_default_root()), loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
- formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
+LoggerConfiguration::LoggerConfiguration()
+ : root_namespace_(create_default_root()),
+ loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
+ formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
loggers.push_back(logger_);
}
-void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
+void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
std::lock_guard<std::mutex> lock(mutex);
root_namespace_ = initialize_namespaces(logger_properties);
std::string spdlog_pattern;
@@ -90,8 +91,7 @@ std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name)
return result;
}
-std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
- initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties) {
+std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties) {
std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sink_map = logger_properties->initial_sinks();
std::string appender_type = "appender";
@@ -117,16 +117,18 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
try {
max_files = std::stoi(max_files_str);
} catch (const std::invalid_argument &ia) {
- } catch (const std::out_of_range &oor) {}
+ } catch (const std::out_of_range &oor) {
+ }
}
- int max_file_size = 5*1024*1024;
+ int max_file_size = 5 * 1024 * 1024;
std::string max_file_size_str = "";
if (logger_properties->get(appender_key + ".max_file_size", max_file_size_str)) {
try {
max_file_size = std::stoi(max_file_size_str);
} catch (const std::invalid_argument &ia) {
- } catch (const std::out_of_range &oor) {}
+ } catch (const std::out_of_range &oor) {
+ }
}
sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files);
} else if ("stdout" == appender_type) {
@@ -170,8 +172,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
}
std::shared_ptr<internal::LoggerNamespace> current_namespace = root_namespace;
if (logger_key != "logger.root") {
- for (auto const & name : utils::StringUtils::split(logger_key.substr(logger_type.length() + 1,
- logger_key.length() - logger_type.length()), "::")) {
+ for (auto const & name : utils::StringUtils::split(logger_key.substr(logger_type.length() + 1, logger_key.length() - logger_type.length()), "::")) {
auto child_pair = current_namespace->children.find(name);
std::shared_ptr<internal::LoggerNamespace> child;
if (child_pair == current_namespace->children.end()) {
@@ -190,8 +191,8 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
return root_namespace;
}
-std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace,
- const std::string &name, std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) {
+std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string &name,
+ std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) {
std::shared_ptr<spdlog::logger> spdlogger = spdlog::get(name);
if (spdlogger) {
if (remove_if_present) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 3d21683..e46f740 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -50,11 +50,8 @@ const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi Flow";
void SiteToSiteProvenanceReportingTask::initialize() {
}
-void SiteToSiteProvenanceReportingTask::getJsonReport(
- core::ProcessContext *context, core::ProcessSession *session,
- std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
- std::string &report) {
-
+void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+ std::string &report) {
Json::Value array;
for (auto record : records) {
Json::Value recordJson;
@@ -62,9 +59,7 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(
Json::Value parentUuidJson;
Json::Value childUuidJson;
recordJson["eventId"] = record->getEventId().c_str();
- recordJson["eventType"] =
- provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record
- ->getEventType()];
+ recordJson["eventType"] = provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
recordJson["timestampMillis"] = record->getEventTime();
recordJson["durationMillis"] = record->getEventDuration();
recordJson["lineageStart"] = record->getlineageStartDate();
@@ -91,10 +86,8 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(
}
recordJson["childIds"] = childUuidJson;
recordJson["transitUri"] = record->getTransitUri().c_str();
- recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier()
- .c_str();
- recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri()
- .c_str();
+ recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier().c_str();
+ recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri().c_str();
recordJson["application"] = ProvenanceAppStr;
array.append(recordJson);
}
@@ -103,14 +96,10 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(
report = writer.write(array);
}
-void SiteToSiteProvenanceReportingTask::onSchedule(
- core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory) {
+void SiteToSiteProvenanceReportingTask::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
}
-void SiteToSiteProvenanceReportingTask::onTrigger(
- core::ProcessContext *context, core::ProcessSession *session) {
-
+void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(true);
if (!protocol_) {
@@ -121,18 +110,14 @@ void SiteToSiteProvenanceReportingTask::onTrigger(
if (!protocol_->bootstrap()) {
// bootstrap the client protocol if needeed
context->yield();
- std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
- context->getProcessorNode().getProcessor());
- logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
- processor->getYieldPeriodMsec());
+ std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor());
+ logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
returnProtocol(std::move(protocol_));
return;
}
std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records;
- std::shared_ptr<provenance::ProvenanceRepository> repo =
- std::static_pointer_cast<provenance::ProvenanceRepository>(
- context->getProvenanceRepository());
+ std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast<provenance::ProvenanceRepository>(context->getProvenanceRepository());
repo->getProvenanceRecord(records, batch_size_);
if (records.size() <= 0) {
returnProtocol(std::move(protocol_));
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index 5f62f83..e6d561a 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -40,24 +40,19 @@ void FlowFileRepository::run() {
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<
- FlowFileRecord>(shared_from_this());
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
std::string key = it->key().ToString();
- if (eventRead->DeSerialize(
- reinterpret_cast<const uint8_t *>(it->value().data()),
- it->value().size())) {
+ if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
purgeList.push_back(key);
} else {
- logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(),
- key.c_str());
+ logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str());
purgeList.push_back(key);
}
}
delete it;
for (auto eventId : purgeList) {
- logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
- eventId.c_str());
+ logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
Delete(eventId);
}
}
@@ -74,19 +69,14 @@ void FlowFileRepository::loadComponent() {
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<FlowFileRecord> eventRead =
- std::make_shared<FlowFileRecord>(shared_from_this());
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
std::string key = it->key().ToString();
- if (eventRead->DeSerialize(
- reinterpret_cast<const uint8_t *>(it->value().data()),
- it->value().size())) {
+ if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
auto search = connectionMap.find(eventRead->getConnectionUuid());
if (search != connectionMap.end()) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
- std::shared_ptr<core::FlowFile> flow_file_ref =
- std::static_pointer_cast<core::FlowFile>(eventRead);
- std::shared_ptr<FlowFileRecord> record =
- std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
+ std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
+ std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
// set store to repo to true so that we do need to persistent again in enqueue
record->setStoredToRepository(true);
search->second->put(record);
@@ -105,8 +95,7 @@ void FlowFileRepository::loadComponent() {
std::vector<std::string>::iterator itPurge;
for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
std::string eventId = *itPurge;
- logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
- eventId.c_str());
+ logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
Delete(eventId);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/repository/VolatileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp
index db036f8..a7e3a51 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -28,8 +28,7 @@ namespace minifi {
namespace core {
namespace repository {
-const char *VolatileRepository::volatile_repo_max_count =
- "max.count";
+const char *VolatileRepository::volatile_repo_max_count = "max.count";
void VolatileRepository::run() {
repo_full_ = false;
@@ -45,9 +44,7 @@ void VolatileRepository::purge() {
RepoValue value;
if (ent->getValue(value)) {
current_size_ -= value.size();
- logger_->log_info("VolatileRepository -- purge %s %d %d %d",
- value.getKey(), current_size_.load(), max_size_,
- current_index_.load());
+ logger_->log_info("VolatileRepository -- purge %s %d %d %d", value.getKey(), current_size_.load(), max_size_, current_index_.load());
}
if (current_size_ < max_size_)
break;