You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:08 UTC

[5/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with increased line length to source

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index c33800d..9e6778c 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -37,8 +37,7 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
-                           ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent)
     : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
       name_(name),
       type_(type),
@@ -60,8 +59,7 @@ ProcessGroup::~ProcessGroup() {
     connection->drain();
   }
 
-  for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
-      it != child_process_groups_.end(); ++it) {
+  for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
     ProcessGroup *processGroup(*it);
     delete processGroup;
   }
@@ -78,8 +76,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
   if (processors_.find(processor) == processors_.end()) {
     // We do not have the same processor in this process group yet
     processors_.insert(processor);
-    logger_->log_info("Add processor %s into process group %s",
-                      processor->getName().c_str(), name_.c_str());
+    logger_->log_info("Add processor %s into process group %s", processor->getName().c_str(), name_.c_str());
   }
 }
 
@@ -89,8 +86,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
   if (processors_.find(processor) != processors_.end()) {
     // We do have the same processor in this process group yet
     processors_.erase(processor);
-    logger_->log_info("Remove processor %s from process group %s",
-                      processor->getName().c_str(), name_.c_str());
+    logger_->log_info("Remove processor %s from process group %s", processor->getName().c_str(), name_.c_str());
   }
 }
 
@@ -100,8 +96,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
   if (child_process_groups_.find(child) == child_process_groups_.end()) {
     // We do not have the same child process group in this process group yet
     child_process_groups_.insert(child);
-    logger_->log_info("Add child process group %s into process group %s",
-                      child->getName().c_str(), name_.c_str());
+    logger_->log_info("Add child process group %s into process group %s", child->getName().c_str(), name_.c_str());
   }
 }
 
@@ -111,13 +106,11 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
   if (child_process_groups_.find(child) != child_process_groups_.end()) {
     // We do have the same child process group in this process group yet
     child_process_groups_.erase(child);
-    logger_->log_info("Remove child process group %s from process group %s",
-                      child->getName().c_str(), name_.c_str());
+    logger_->log_info("Remove child process group %s from process group %s", child->getName().c_str(), name_.c_str());
   }
 }
 
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
-                                   EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
@@ -125,8 +118,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
     for (auto processor : processors_) {
       logger_->log_debug("Starting %s", processor->getName().c_str());
 
-      if (!processor->isRunning()
-          && processor->getScheduledState() != DISABLED) {
+      if (!processor->isRunning() && processor->getScheduledState() != DISABLED) {
         if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
           timeScheduler->schedule(processor);
         else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
@@ -141,20 +133,17 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
   } catch (...) {
-    logger_->log_debug(
-        "Caught Exception during process group start processing");
+    logger_->log_debug("Caught Exception during process group start processing");
     throw;
   }
 }
 
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
-                                  EventDrivenSchedulingAgent *eventScheduler) {
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Stop all the processor node, input and output ports
-    for (std::set<std::shared_ptr<Processor> >::iterator it =
-        processors_.begin(); it != processors_.end(); ++it) {
+    for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) {
       std::shared_ptr<Processor> processor(*it);
       if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
         timeScheduler->unschedule(processor);
@@ -162,8 +151,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
         eventScheduler->unschedule(processor);
     }
 
-    for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
-        it != child_process_groups_.end(); ++it) {
+    for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
       ProcessGroup *processGroup(*it);
       processGroup->stopProcessing(timeScheduler, eventScheduler);
     }
@@ -195,8 +183,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
     }
   }
   for (auto processGroup : child_process_groups_) {
-    logger_->log_info("find processor child %s",
-                      processGroup->getName().c_str());
+    logger_->log_info("find processor child %s", processGroup->getName().c_str());
     std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
     if (processor)
       return processor;
@@ -204,9 +191,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
   return ret;
 }
 
-void ProcessGroup::addControllerService(
-    const std::string &nodeId,
-    std::shared_ptr<core::controller::ControllerServiceNode> &node) {
+void ProcessGroup::addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node) {
   controller_service_map_.put(nodeId, node);
 }
 
@@ -215,13 +200,11 @@ void ProcessGroup::addControllerService(
  * @param node node identifier
  * @return controller service node, if it exists.
  */
-std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(
-    const std::string &nodeId) {
+std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(const std::string &nodeId) {
   return controller_service_map_.getControllerServiceNode(nodeId);
 }
 
-std::shared_ptr<Processor> ProcessGroup::findProcessor(
-    const std::string &processorName) {
+std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
@@ -230,17 +213,14 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(
       return processor;
   }
   for (auto processGroup : child_process_groups_) {
-    std::shared_ptr<Processor> processor = processGroup->findProcessor(
-        processorName);
+    std::shared_ptr<Processor> processor = processGroup->findProcessor(processorName);
     if (processor)
       return processor;
   }
   return ret;
 }
 
-void ProcessGroup::updatePropertyValue(std::string processorName,
-                                       std::string propertyName,
-                                       std::string propertyValue) {
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   for (auto processor : processors_) {
     if (processor->getName() == processorName) {
@@ -248,14 +228,12 @@ void ProcessGroup::updatePropertyValue(std::string processorName,
     }
   }
   for (auto processGroup : child_process_groups_) {
-    processGroup->updatePropertyValue(processorName, propertyName,
-                                      propertyValue);
+    processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
   }
   return;
 }
 
-void ProcessGroup::getConnections(
-    std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
+void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
   for (auto connection : connections_) {
     connectionMap[connection->getUUIDStr()] = connection;
   }
@@ -270,8 +248,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
   if (connections_.find(connection) == connections_.end()) {
     // We do not have the same connection in this process group yet
     connections_.insert(connection);
-    logger_->log_info("Add connection %s into process group %s",
-                      connection->getName().c_str(), name_.c_str());
+    logger_->log_info("Add connection %s into process group %s", connection->getName().c_str(), name_.c_str());
     uuid_t sourceUUID;
     std::shared_ptr<Processor> source = NULL;
     connection->getSourceUUID(sourceUUID);
@@ -293,8 +270,7 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
   if (connections_.find(connection) != connections_.end()) {
     // We do not have the same connection in this process group yet
     connections_.erase(connection);
-    logger_->log_info("Remove connection %s into process group %s",
-                      connection->getName().c_str(), name_.c_str());
+    logger_->log_info("Remove connection %s into process group %s", connection->getName().c_str(), name_.c_str());
     uuid_t sourceUUID;
     std::shared_ptr<Processor> source = NULL;
     connection->getSourceUUID(sourceUUID);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 70de3f6..037660f 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -38,40 +38,31 @@ namespace core {
 
 std::shared_ptr<core::FlowFile> ProcessSession::create() {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
-      process_context_->getProvenanceRepository(), empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
 
   _addedFlowFiles[record->getUUIDStr()] = record;
-  logger_->log_debug("Create FlowFile with UUID %s",
-                     record->getUUIDStr().c_str());
-  std::string details = process_context_->getProcessorNode().getName()
-      + " creates flow record " + record->getUUIDStr();
+  logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+  std::string details = process_context_->getProcessorNode().getName() + " creates flow record " + record->getUUIDStr();
   provenance_report_->create(record, details);
 
   return record;
 }
 
-std::shared_ptr<core::FlowFile> ProcessSession::create(
-    std::shared_ptr<core::FlowFile> &&parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
-      process_context_->getProvenanceRepository(), empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
 
   if (record) {
     _addedFlowFiles[record->getUUIDStr()] = record;
-    logger_->log_debug("Create FlowFile with UUID %s",
-                       record->getUUIDStr().c_str());
+    logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
   }
 
   if (record) {
     // Copy attributes
-    std::map<std::string, std::string> parentAttributes =
-        parent->getAttributes();
+    std::map<std::string, std::string> parentAttributes = parent->getAttributes();
     std::map<std::string, std::string>::iterator it;
     for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
-      if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
-          || it->first == FlowAttributeKey(DISCARD_REASON)
-          || it->first == FlowAttributeKey(UUID))
+      if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID))
         // Do not copy special attributes from parent
         continue;
       record->setAttribute(it->first, it->second);
@@ -83,8 +74,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(
   return record;
 }
 
-std::shared_ptr<core::FlowFile> ProcessSession::clone(
-    std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent) {
   std::shared_ptr<core::FlowFile> record = this->create(parent);
   if (record) {
     // Copy Resource Claim
@@ -100,24 +90,18 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
   return record;
 }
 
-std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
-    std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(
-      process_context_->getProvenanceRepository(), empty);
+  std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
 
   if (record) {
     this->_clonedFlowFiles[record->getUUIDStr()] = record;
-    logger_->log_debug("Clone FlowFile with UUID %s during transfer",
-                       record->getUUIDStr().c_str());
+    logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
     // Copy attributes
-    std::map<std::string, std::string> parentAttributes =
-        parent->getAttributes();
+    std::map<std::string, std::string> parentAttributes = parent->getAttributes();
     std::map<std::string, std::string>::iterator it;
     for (it = parentAttributes.begin(); it != parentAttributes.end(); it++) {
-      if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER)
-          || it->first == FlowAttributeKey(DISCARD_REASON)
-          || it->first == FlowAttributeKey(UUID))
+      if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || it->first == FlowAttributeKey(DISCARD_REASON) || it->first == FlowAttributeKey(UUID))
         // Do not copy special attributes from parent
         continue;
       record->setAttribute(it->first, it->second);
@@ -141,18 +125,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(
   return record;
 }
 
-std::shared_ptr<core::FlowFile> ProcessSession::clone(
-    std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
+std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) {
   std::shared_ptr<core::FlowFile> record = this->create(parent);
   if (record) {
     if (parent->getResourceClaim()) {
       if ((offset + size) > parent->getSize()) {
         // Set offset and size
-        logger_->log_error("clone offset %d and size %d exceed parent size %d",
-                           offset, size, parent->getSize());
+        logger_->log_error("clone offset %d and size %d exceed parent size %d", offset, size, parent->getSize());
         // Remove the Add FlowFile for the session
-        std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it =
-            this->_addedFlowFiles.find(record->getUUIDStr());
+        std::map<std::string, std::shared_ptr<core::FlowFile> >::iterator it = this->_addedFlowFiles.find(record->getUUIDStr());
         if (it != this->_addedFlowFiles.end())
           this->_addedFlowFiles.erase(record->getUUIDStr());
         return nullptr;
@@ -174,85 +155,65 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(
 void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
   _deletedFlowFiles[flow->getUUIDStr()] = flow;
-  std::string reason = process_context_->getProcessorNode().getName()
-      + " drop flow record " + flow->getUUIDStr();
+  std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
 }
 
 void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) {
   flow->setDeleted(true);
   _deletedFlowFiles[flow->getUUIDStr()] = flow;
-  std::string reason = process_context_->getProcessorNode().getName()
-      + " drop flow record " + flow->getUUIDStr();
+  std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
 }
 
-void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow,
-                                  std::string key, std::string value) {
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
   flow->setAttribute(key, value);
-  std::string details = process_context_->getProcessorNode().getName()
-      + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
-      + value;
+  std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
   provenance_report_->modifyAttributes(flow, details);
 }
 
-void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow,
-                                     std::string key) {
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) {
   flow->removeAttribute(key);
-  std::string details = process_context_->getProcessorNode().getName()
-      + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+  std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
   provenance_report_->modifyAttributes(flow, details);
 }
 
-void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow,
-                                  std::string key, std::string value) {
+void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) {
   flow->setAttribute(key, value);
-  std::string details = process_context_->getProcessorNode().getName()
-      + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":"
-      + value;
+  std::string details = process_context_->getProcessorNode().getName() + " modify flow record " + flow->getUUIDStr() + " attribute " + key + ":" + value;
   provenance_report_->modifyAttributes(flow, details);
 }
 
-void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow,
-                                     std::string key) {
+void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) {
   flow->removeAttribute(key);
-  std::string details = process_context_->getProcessorNode().getName()
-      + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
+  std::string details = process_context_->getProcessorNode().getName() + " remove flow record " + flow->getUUIDStr() + " attribute " + key;
   provenance_report_->modifyAttributes(flow, details);
 }
 
 void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) {
-  flow->setPenaltyExpiration(
-      getTimeMillis()
-          + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+  flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec());
 }
 
 void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) {
-  flow->setPenaltyExpiration(
-      getTimeMillis()
-          + process_context_->getProcessorNode().getPenalizationPeriodMsec());
+  flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec());
 }
 
-void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow,
-                              Relationship relationship) {
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   _transferRelationship[flow->getUUIDStr()] = relationship;
 }
 
-void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow,
-                              Relationship relationship) {
+void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship) {
   _transferRelationship[flow->getUUIDStr()] = relationship;
 }
 
-void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
-                           OutputStreamCallback *callback) {
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(
   DEFAULT_CONTENT_DIRECTORY);
 
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     if (fs.is_open()) {
       // Call the callback to write the content
       callback->process(&fs);
@@ -271,8 +232,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
          logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
          flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
         fs.close();
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -299,14 +259,12 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow,
   }
 }
 
-void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
-                           OutputStreamCallback *callback) {
+void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     if (fs.is_open()) {
       // Call the callback to write the content
       callback->process(&fs);
@@ -325,8 +283,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
          logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
          flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
         fs.close();
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -353,8 +310,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow,
   }
 }
 
-void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
-                            OutputStreamCallback *callback) {
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback) {
   std::shared_ptr<ResourceClaim> claim = nullptr;
 
   if (flow->getResourceClaim() == nullptr) {
@@ -367,8 +323,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::app);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
     if (fs.is_open()) {
       // Call the callback to write the content
       std::streampos oldPos = fs.tellp();
@@ -380,8 +335,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
          logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
          flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
         fs.close();
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -400,8 +354,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow,
   }
 }
 
-void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
-                            OutputStreamCallback *callback) {
+void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
   std::shared_ptr<ResourceClaim> claim = nullptr;
 
   if (flow->getResourceClaim() == nullptr) {
@@ -414,8 +367,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::app);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
     if (fs.is_open()) {
       // Call the callback to write the content
       std::streampos oldPos = fs.tellp();
@@ -427,8 +379,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
          logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
          flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
         fs.close();
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -447,21 +398,18 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow,
   }
 }
 
-void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow,
-                          InputStreamCallback *callback) {
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
   try {
     std::shared_ptr<ResourceClaim> claim = nullptr;
 
     if (flow->getResourceClaim() == nullptr) {
       // No existed claim for read, we throw exception
-      throw Exception(FILE_OPERATION_EXCEPTION,
-                      "No Content Claim existed for read");
+      throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
     }
 
     claim = flow->getResourceClaim();
     std::ifstream fs;
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::in | std::fstream::binary);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
     if (fs.is_open()) {
       fs.seekg(flow->getOffset(), fs.beg);
 
@@ -487,21 +435,18 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &flow,
   }
 }
 
-void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
-                          InputStreamCallback *callback) {
+void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback) {
   try {
     std::shared_ptr<ResourceClaim> claim = nullptr;
 
     if (flow->getResourceClaim() == nullptr) {
       // No existed claim for read, we throw exception
-      throw Exception(FILE_OPERATION_EXCEPTION,
-                      "No Content Claim existed for read");
+      throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
     }
 
     claim = flow->getResourceClaim();
     std::ifstream fs;
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::in | std::fstream::binary);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
     if (fs.is_open()) {
       fs.seekg(flow->getOffset(), fs.beg);
 
@@ -533,8 +478,7 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow,
  * @param flow flow file
  *
  */
-void ProcessSession::importFrom(io::DataStream &stream,
-                                std::shared_ptr<core::FlowFile> &&flow) {
+void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow) {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
 
   int max_read = getpagesize();
@@ -544,8 +488,7 @@ void ProcessSession::importFrom(io::DataStream &stream,
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
 
     if (fs.is_open()) {
       size_t position = 0;
@@ -576,15 +519,11 @@ void ProcessSession::importFrom(io::DataStream &stream,
         flow->setResourceClaim(claim);
         claim->increaseFlowFileRecordOwnedCount();
 
-        logger_->log_debug(
-            "Import offset %d length %d into content %s for FlowFile UUID %s",
-            flow->getOffset(), flow->getSize(),
-            flow->getResourceClaim()->getContentFullPath().c_str(),
-            flow->getUUIDStr().c_str());
+        logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+                           flow->getUUIDStr().c_str());
 
         fs.close();
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -611,9 +550,9 @@ void ProcessSession::importFrom(io::DataStream &stream,
   }
 }
 
-void ProcessSession::import(std::string source,
-                            std::shared_ptr<core::FlowFile> &flow,
-                            bool keepSource, uint64_t offset) {
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &flow,
+bool keepSource,
+                            uint64_t offset) {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
   char *buf = NULL;
   int size = 4096;
@@ -622,8 +561,7 @@ void ProcessSession::import(std::string source,
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
 
@@ -649,18 +587,14 @@ void ProcessSession::import(std::string source,
         flow->setResourceClaim(claim);
         claim->increaseFlowFileRecordOwnedCount();
 
-        logger_->log_debug(
-            "Import offset %d length %d into content %s for FlowFile UUID %s",
-            flow->getOffset(), flow->getSize(),
-            flow->getResourceClaim()->getContentFullPath().c_str(),
-            flow->getUUIDStr().c_str());
+        logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+                           flow->getUUIDStr().c_str());
 
         fs.close();
         input.close();
         if (!keepSource)
           std::remove(source.c_str());
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -692,9 +626,9 @@ void ProcessSession::import(std::string source,
   }
 }
 
-void ProcessSession::import(std::string source,
-                            std::shared_ptr<core::FlowFile> &&flow,
-                            bool keepSource, uint64_t offset) {
+void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
+bool keepSource,
+                            uint64_t offset) {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
 
   char *buf = NULL;
@@ -704,8 +638,7 @@ void ProcessSession::import(std::string source,
   try {
     std::ofstream fs;
     uint64_t startTime = getTimeMillis();
-    fs.open(claim->getContentFullPath().c_str(),
-            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+    fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
 
@@ -731,18 +664,14 @@ void ProcessSession::import(std::string source,
         flow->setResourceClaim(claim);
         claim->increaseFlowFileRecordOwnedCount();
 
-        logger_->log_debug(
-            "Import offset %d length %d into content %s for FlowFile UUID %s",
-            flow->getOffset(), flow->getSize(),
-            flow->getResourceClaim()->getContentFullPath().c_str(),
-            flow->getUUIDStr().c_str());
+        logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
+                           flow->getUUIDStr().c_str());
 
         fs.close();
         input.close();
         if (!keepSource)
           std::remove(source.c_str());
-        std::string details = process_context_->getProcessorNode().getName()
-            + " modify flow record content " + flow->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flow->getUUIDStr();
         uint64_t endTime = getTimeMillis();
         provenance_report_->modifyContent(flow, details, endTime - startTime);
       } else {
@@ -781,21 +710,16 @@ void ProcessSession::commit() {
       std::shared_ptr<core::FlowFile> record = it.second;
       if (record->isDeleted())
         continue;
-      std::map<std::string, Relationship>::iterator itRelationship = this
-          ->_transferRelationship.find(record->getUUIDStr());
+      std::map<std::string, Relationship>::iterator itRelationship = this->_transferRelationship.find(record->getUUIDStr());
       if (itRelationship != _transferRelationship.end()) {
         Relationship relationship = itRelationship->second;
         // Find the relationship, we need to find the connections for that relationship
-        std::set<std::shared_ptr<Connectable>> connections = process_context_
-            ->getProcessorNode().getOutGoingConnections(relationship.getName());
+        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName());
         if (connections.empty()) {
           // No connection
-          if (!process_context_->getProcessorNode().isAutoTerminated(
-              relationship)) {
+          if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) {
             // Not autoterminate, we should have the connect
-            std::string message =
-                "Connect empty for non auto terminated relationship"
-                    + relationship.getName();
+            std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
             throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
           } else {
             // Autoterminated
@@ -803,9 +727,7 @@ void ProcessSession::commit() {
           }
         } else {
           // We connections, clone the flow and assign the connection accordingly
-          for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
-              connections.begin(); itConnection != connections.end();
-              ++itConnection) {
+          for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
             std::shared_ptr<Connectable> connection = *itConnection;
             if (itConnection == connections.begin()) {
               // First connection which the flow need be routed to
@@ -817,15 +739,13 @@ void ProcessSession::commit() {
               if (cloneRecord)
                 cloneRecord->setConnection(connection);
               else
-                throw Exception(PROCESS_SESSION_EXCEPTION,
-                                "Can not clone the flow for transfer");
+                throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
             }
           }
         }
       } else {
         // Can not find relationship for the flow
-        throw Exception(PROCESS_SESSION_EXCEPTION,
-                        "Can not find the transfer relationship for the flow");
+        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
       }
     }
 
@@ -834,21 +754,16 @@ void ProcessSession::commit() {
       std::shared_ptr<core::FlowFile> record = it.second;
       if (record->isDeleted())
         continue;
-      std::map<std::string, Relationship>::iterator itRelationship = this
-          ->_transferRelationship.find(record->getUUIDStr());
+      std::map<std::string, Relationship>::iterator itRelationship = this->_transferRelationship.find(record->getUUIDStr());
       if (itRelationship != _transferRelationship.end()) {
         Relationship relationship = itRelationship->second;
         // Find the relationship, we need to find the connections for that relationship
-        std::set<std::shared_ptr<Connectable>> connections = process_context_
-            ->getProcessorNode().getOutGoingConnections(relationship.getName());
+        std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName());
         if (connections.empty()) {
           // No connection
-          if (!process_context_->getProcessorNode().isAutoTerminated(
-              relationship)) {
+          if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) {
             // Not autoterminate, we should have the connect
-            std::string message =
-                "Connect empty for non auto terminated relationship "
-                    + relationship.getName();
+            std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
             throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
           } else {
             // Autoterminated
@@ -856,9 +771,7 @@ void ProcessSession::commit() {
           }
         } else {
           // We connections, clone the flow and assign the connection accordingly
-          for (std::set<std::shared_ptr<Connectable>>::iterator itConnection =
-              connections.begin(); itConnection != connections.end();
-              ++itConnection) {
+          for (std::set<std::shared_ptr<Connectable>>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
             std::shared_ptr<Connectable> connection(*itConnection);
             if (itConnection == connections.begin()) {
               // First connection which the flow need be routed to
@@ -870,15 +783,13 @@ void ProcessSession::commit() {
               if (cloneRecord)
                 cloneRecord->setConnection(connection);
               else
-                throw Exception(PROCESS_SESSION_EXCEPTION,
-                                "Can not clone the flow for transfer");
+                throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
             }
           }
         }
       } else {
         // Can not find relationship for the flow
-        throw Exception(PROCESS_SESSION_EXCEPTION,
-                        "Can not find the transfer relationship for the flow");
+        throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
       }
     }
 
@@ -890,8 +801,7 @@ void ProcessSession::commit() {
         continue;
       }
 
-      connection = std::static_pointer_cast<Connection>(
-          record->getConnection());
+      connection = std::static_pointer_cast<Connection>(record->getConnection());
       if ((connection) != nullptr)
         connection->put(record);
     }
@@ -900,8 +810,7 @@ void ProcessSession::commit() {
       if (record->isDeleted()) {
         continue;
       }
-      connection = std::static_pointer_cast<Connection>(
-          record->getConnection());
+      connection = std::static_pointer_cast<Connection>(record->getConnection());
       if ((connection) != nullptr)
         connection->put(record);
     }
@@ -911,8 +820,7 @@ void ProcessSession::commit() {
       if (record->isDeleted()) {
         continue;
       }
-      connection = std::static_pointer_cast<Connection>(
-          record->getConnection());
+      connection = std::static_pointer_cast<Connection>(record->getConnection());
       if ((connection) != nullptr)
         connection->put(record);
     }
@@ -925,8 +833,7 @@ void ProcessSession::commit() {
     _originalFlowFiles.clear();
     // persistent the provenance report
     this->provenance_report_->commit();
-    logger_->log_trace("ProcessSession committed for %s",
-                       process_context_->getProcessorNode().getName().c_str());
+    logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode().getName().c_str());
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -942,11 +849,9 @@ void ProcessSession::rollback() {
     // Requeue the snapshot of the flowfile back
     for (const auto &it : _originalFlowFiles) {
       std::shared_ptr<core::FlowFile> record = it.second;
-      connection = std::static_pointer_cast<Connection>(
-          record->getOriginalConnection());
+      connection = std::static_pointer_cast<Connection>(record->getOriginalConnection());
       if ((connection) != nullptr) {
-        std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<
-            FlowFileRecord>(record);
+        std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record);
         flowf->setSnapShot(false);
         connection->put(record);
       }
@@ -957,8 +862,7 @@ void ProcessSession::rollback() {
     _addedFlowFiles.clear();
     _updatedFlowFiles.clear();
     _deletedFlowFiles.clear();
-    logger_->log_trace("ProcessSession rollback for %s",
-                       process_context_->getProcessorNode().getName().c_str());
+    logger_->log_trace("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str());
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
@@ -969,25 +873,21 @@ void ProcessSession::rollback() {
 }
 
 std::shared_ptr<core::FlowFile> ProcessSession::get() {
-  std::shared_ptr<Connectable> first = process_context_->getProcessorNode()
-      .getNextIncomingConnection();
+  std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection();
 
   if (first == NULL)
     return NULL;
 
-  std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(
-      first);
+  std::shared_ptr<Connection> current = std::static_pointer_cast<Connection>(first);
 
   do {
     std::set<std::shared_ptr<core::FlowFile> > expired;
     std::shared_ptr<core::FlowFile> ret = current->poll(expired);
     if (expired.size() > 0) {
       // Remove expired flow record
-      for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired
-          .begin(); it != expired.end(); ++it) {
+      for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) {
         std::shared_ptr<core::FlowFile> record = *it;
-        std::string details = process_context_->getProcessorNode().getName()
-            + " expire flow record " + record->getUUIDStr();
+        std::string details = process_context_->getProcessorNode().getName() + " expire flow record " + record->getUUIDStr();
         provenance_report_->expire(record, details);
       }
     }
@@ -996,19 +896,15 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       ret->setDeleted(false);
       _updatedFlowFiles[ret->getUUIDStr()] = ret;
       std::map<std::string, std::string> empty;
-      std::shared_ptr<core::FlowFile> snapshot =
-          std::make_shared<FlowFileRecord>(
-              process_context_->getProvenanceRepository(), empty);
-      logger_->log_debug("Create Snapshot FlowFile with UUID %s",
-                         snapshot->getUUIDStr().c_str());
+      std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), empty);
+      logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
       snapshot = ret;
 //      snapshot->duplicate(ret);
       // save a snapshot
       _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
       return ret;
     }
-    current = std::static_pointer_cast<Connection>(
-        process_context_->getProcessorNode().getNextIncomingConnection());
+    current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode().getNextIncomingConnection());
   } while (current != NULL && current != first);
 
   return NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index dbeb46f..7b07638 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -45,8 +45,9 @@ namespace minifi {
 namespace core {
 
 Processor::Processor(std::string name, uuid_t uuid)
-    : Connectable(name, uuid), ConfigurableComponent(),
-    logger_(logging::LoggerFactory<Processor>::getLogger()) {
+    : Connectable(name, uuid),
+      ConfigurableComponent(),
+      logger_(logging::LoggerFactory<Processor>::getLogger()) {
   has_work_.store(false);
   // Setup the default values
   state_ = DISABLED;
@@ -61,8 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
   active_tasks_ = 0;
   yield_expiration_ = 0;
   incoming_connections_Iter = this->_incomingConnections.begin();
-  logger_->log_info("Processor %s created UUID %s", name_.c_str(),
-                    uuidStr_.c_str());
+  logger_->log_info("Processor %s created UUID %s", name_.c_str(), uuidStr_.c_str());
 }
 
 bool Processor::isRunning() {
@@ -77,12 +77,10 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
   bool ret = false;
 
   if (isRunning()) {
-    logger_->log_info("Can not add connection while the process %s is running",
-                      name_.c_str());
+    logger_->log_info("Can not add connection while the process %s is running", name_.c_str());
     return false;
   }
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
-      conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
   std::lock_guard<std::mutex> lock(mutex_);
 
   uuid_t srcUUID;
@@ -101,9 +99,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
     if (_incomingConnections.find(connection) == _incomingConnections.end()) {
       _incomingConnections.insert(connection);
       connection->setDestination(shared_from_this());
-      logger_->log_info(
-          "Add connection %s into Processor %s incoming connection",
-          connection->getName().c_str(), name_.c_str());
+      logger_->log_info("Add connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str());
       incoming_connections_Iter = this->_incomingConnections.begin();
       ret = true;
     }
@@ -122,9 +118,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
         existedConnection.insert(connection);
         connection->setSource(shared_from_this());
         out_going_connections_[relationship] = existedConnection;
-        logger_->log_info(
-            "Add connection %s into Processor %s outgoing connection for relationship %s",
-            connection->getName().c_str(), name_.c_str(), relationship.c_str());
+        logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
         ret = true;
       }
     } else {
@@ -133,9 +127,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
       newConnection.insert(connection);
       connection->setSource(shared_from_this());
       out_going_connections_[relationship] = newConnection;
-      logger_->log_info(
-          "Add connection %s into Processor %s outgoing connection for relationship %s",
-          connection->getName().c_str(), name_.c_str(), relationship.c_str());
+      logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
       ret = true;
     }
   }
@@ -145,9 +137,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
 
 void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   if (isRunning()) {
-    logger_->log_info(
-        "Can not remove connection while the process %s is running",
-        name_.c_str());
+    logger_->log_info("Can not remove connection while the process %s is running", name_.c_str());
     return;
   }
 
@@ -156,8 +146,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   uuid_t srcUUID;
   uuid_t destUUID;
 
-  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(
-      conn);
+  std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
@@ -167,9 +156,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     if (_incomingConnections.find(connection) != _incomingConnections.end()) {
       _incomingConnections.erase(connection);
       connection->setDestination(NULL);
-      logger_->log_info(
-          "Remove connection %s into Processor %s incoming connection",
-          connection->getName().c_str(), name_.c_str());
+      logger_->log_info("Remove connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str());
       incoming_connections_Iter = this->_incomingConnections.begin();
     }
   }
@@ -181,13 +168,10 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     if (it == out_going_connections_.end()) {
       return;
     } else {
-      if (out_going_connections_[relationship].find(connection)
-          != out_going_connections_[relationship].end()) {
+      if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) {
         out_going_connections_[relationship].erase(connection);
         connection->setSource(NULL);
-        logger_->log_info(
-            "Remove connection %s into Processor %s outgoing connection for relationship %s",
-            connection->getName().c_str(), name_.c_str(), relationship.c_str());
+        logger_->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
       }
     }
   }
@@ -200,8 +184,7 @@ bool Processor::flowFilesQueued() {
     return false;
 
   for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection =
-        std::static_pointer_cast<Connection>(conn);
+    std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
     if (connection->getQueueSize() > 0)
       return true;
   }
@@ -216,8 +199,7 @@ bool Processor::flowFilesOutGoingFull() {
     // We already has connection for this relationship
     std::set<std::shared_ptr<Connectable>> existedConnection = connection.second;
     for (const auto conn : existedConnection) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
       if (connection->isFull())
         return true;
     }
@@ -226,8 +208,7 @@ bool Processor::flowFilesOutGoingFull() {
   return false;
 }
 
-void Processor::onTrigger(ProcessContext *context,
-                          ProcessSessionFactory *sessionFactory) {
+void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
   auto session = sessionFactory->createSession();
 
   try {
@@ -251,17 +232,15 @@ bool Processor::isWorkAvailable() {
 
   try {
     for (const auto &conn : _incomingConnections) {
-      std::shared_ptr<Connection> connection = std::static_pointer_cast<
-          Connection>(conn);
+      std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
       if (connection->getQueueSize() > 0) {
         hasWork = true;
         break;
       }
     }
   } catch (...) {
-    logger_->log_error(
-        "Caught an exception while checking if work is available;"
-        " unless it was positively determined that work is available, assuming NO work is available!");
+    logger_->log_error("Caught an exception while checking if work is available;"
+                       " unless it was positively determined that work is available, assuming NO work is available!");
   }
 
   return hasWork;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
index bf39738..05f31a0 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -25,7 +25,8 @@ namespace core {
 
 ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor)
     : processor_(processor),
-      Connectable(processor->getName(), 0), ConfigurableComponent() {
+      Connectable(processor->getName(), 0),
+      ConfigurableComponent() {
   uuid_t copy;
   processor->getUUID(copy);
   setUUID(copy);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index 45ad980..cf18601 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -41,12 +41,10 @@ namespace core {
 class FlowFileRepository;
 #endif
 
-std::shared_ptr<core::Repository> createRepository(
-    const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
+std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
   std::shared_ptr<core::Repository> return_obj = nullptr;
   std::string class_name_lc = configuration_class_name;
-  std::transform(class_name_lc.begin(), class_name_lc.end(),
-                 class_name_lc.begin(), ::tolower);
+  std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
   try {
     std::shared_ptr<core::Repository> return_obj = nullptr;
     if (class_name_lc == "flowfilerepository") {
@@ -65,21 +63,17 @@ std::shared_ptr<core::Repository> createRepository(
       return return_obj;
     }
     if (fail_safe) {
-      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
-                                                1);
+      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
     } else {
-      throw std::runtime_error(
-          "Support for the provided configuration class could not be found");
+      throw std::runtime_error("Support for the provided configuration class could not be found");
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
-                                                1);
+      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
     }
   }
 
-  throw std::runtime_error(
-      "Support for the provided configuration class could not be found");
+  throw std::runtime_error("Support for the provided configuration class could not be found");
 }
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/ControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/ControllerServiceNode.cpp b/libminifi/src/core/controller/ControllerServiceNode.cpp
index 12e3653..3097574 100644
--- a/libminifi/src/core/controller/ControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/ControllerServiceNode.cpp
@@ -39,8 +39,6 @@ std::vector<std::shared_ptr<ConfigurableComponent> > &ControllerServiceNode::get
   return linked_components_;
 }
 
-
-
 } /* namespace controller */
 } /* namespace core */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/ControllerServiceProvider.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/ControllerServiceProvider.cpp b/libminifi/src/core/controller/ControllerServiceProvider.cpp
index da5c6a1..942c299 100644
--- a/libminifi/src/core/controller/ControllerServiceProvider.cpp
+++ b/libminifi/src/core/controller/ControllerServiceProvider.cpp
@@ -32,8 +32,7 @@ namespace controller {
  * @return the ControllerService that is registered with the given
  * identifier
  */
-std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService(
-    const std::string &identifier) {
+std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerService(const std::string &identifier) {
   auto service = controller_map_->getControllerServiceNode(identifier);
   if (service != nullptr) {
     return service->getControllerServiceImplementation();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/controller/StandardControllerServiceNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 26804f6..5c4aa70 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -31,8 +31,7 @@ std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGr
   return process_group_;
 }
 
-void StandardControllerServiceNode::setProcessGroup(
-    std::shared_ptr<ProcessGroup> &processGroup) {
+void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) {
   std::lock_guard<std::mutex> lock(mutex_);
   process_group_ = processGroup;
 }
@@ -44,8 +43,7 @@ bool StandardControllerServiceNode::enable() {
   if (getProperty(property.getName(), property)) {
     active = true;
     for (auto linked_service : property.getValues()) {
-      std::shared_ptr<ControllerServiceNode> csNode = provider
-          ->getControllerServiceNode(linked_service);
+      std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service);
       if (nullptr != csNode) {
         std::lock_guard<std::mutex> lock(mutex_);
         linked_controller_services_.push_back(csNode);
@@ -53,8 +51,7 @@ bool StandardControllerServiceNode::enable() {
     }
   } else {
   }
-  std::shared_ptr<ControllerService> impl =
-      getControllerServiceImplementation();
+  std::shared_ptr<ControllerService> impl = getControllerServiceImplementation();
   if (nullptr != impl) {
     impl->onEnable();
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 4bb6615..89e6a1b 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -42,25 +42,26 @@ namespace logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v";
 
-std::vector< std::string > LoggerProperties::get_keys_of_type(const std::string &type) {
+std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) {
   std::vector<std::string> appenders;
   std::string prefix = type + ".";
   for (auto const & entry : properties_) {
-    if (entry.first.rfind(prefix, 0) == 0 &&
-      entry.first.find(".", prefix.length() + 1) == std::string::npos) {
+    if (entry.first.rfind(prefix, 0) == 0 && entry.first.find(".", prefix.length() + 1) == std::string::npos) {
       appenders.push_back(entry.first);
     }
   }
   return appenders;
 }
 
-LoggerConfiguration::LoggerConfiguration() : root_namespace_(create_default_root()), loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
-                                             formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
+LoggerConfiguration::LoggerConfiguration()
+    : root_namespace_(create_default_root()),
+      loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
+      formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) {
   logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
   loggers.push_back(logger_);
 }
 
-void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties)  {
+void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) {
   std::lock_guard<std::mutex> lock(mutex);
   root_namespace_ = initialize_namespaces(logger_properties);
   std::string spdlog_pattern;
@@ -90,8 +91,7 @@ std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name)
   return result;
 }
 
-std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
-     initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties) {
+std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties) {
   std::map<std::string, std::shared_ptr<spdlog::sinks::sink>> sink_map = logger_properties->initial_sinks();
 
   std::string appender_type = "appender";
@@ -117,16 +117,18 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
         try {
           max_files = std::stoi(max_files_str);
         } catch (const std::invalid_argument &ia) {
-        } catch (const std::out_of_range &oor) {}
+        } catch (const std::out_of_range &oor) {
+        }
       }
 
-      int max_file_size = 5*1024*1024;
+      int max_file_size = 5 * 1024 * 1024;
       std::string max_file_size_str = "";
       if (logger_properties->get(appender_key + ".max_file_size", max_file_size_str)) {
         try {
           max_file_size = std::stoi(max_file_size_str);
         } catch (const std::invalid_argument &ia) {
-        } catch (const std::out_of_range &oor) {}
+        } catch (const std::out_of_range &oor) {
+        }
       }
       sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files);
     } else if ("stdout" == appender_type) {
@@ -170,8 +172,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
     }
     std::shared_ptr<internal::LoggerNamespace> current_namespace = root_namespace;
     if (logger_key != "logger.root") {
-      for (auto const & name : utils::StringUtils::split(logger_key.substr(logger_type.length() + 1,
-          logger_key.length() - logger_type.length()), "::")) {
+      for (auto const & name : utils::StringUtils::split(logger_key.substr(logger_type.length() + 1, logger_key.length() - logger_type.length()), "::")) {
         auto child_pair = current_namespace->children.find(name);
         std::shared_ptr<internal::LoggerNamespace> child;
         if (child_pair == current_namespace->children.end()) {
@@ -190,8 +191,8 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::
   return root_namespace;
 }
 
-std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace,
-                                                                const std::string &name, std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) {
+std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string &name,
+                                                                std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present) {
   std::shared_ptr<spdlog::logger> spdlogger = spdlog::get(name);
   if (spdlogger) {
     if (remove_if_present) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 3d21683..e46f740 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -50,11 +50,8 @@ const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi Flow";
 void SiteToSiteProvenanceReportingTask::initialize() {
 }
 
-void SiteToSiteProvenanceReportingTask::getJsonReport(
-    core::ProcessContext *context, core::ProcessSession *session,
-    std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
-    std::string &report) {
-
+void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+                                                      std::string &report) {
   Json::Value array;
   for (auto record : records) {
     Json::Value recordJson;
@@ -62,9 +59,7 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(
     Json::Value parentUuidJson;
     Json::Value childUuidJson;
     recordJson["eventId"] = record->getEventId().c_str();
-    recordJson["eventType"] =
-        provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record
-            ->getEventType()];
+    recordJson["eventType"] = provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
     recordJson["timestampMillis"] = record->getEventTime();
     recordJson["durationMillis"] = record->getEventDuration();
     recordJson["lineageStart"] = record->getlineageStartDate();
@@ -91,10 +86,8 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(
     }
     recordJson["childIds"] = childUuidJson;
     recordJson["transitUri"] = record->getTransitUri().c_str();
-    recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier()
-        .c_str();
-    recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri()
-        .c_str();
+    recordJson["remoteIdentifier"] = record->getSourceSystemFlowFileIdentifier().c_str();
+    recordJson["alternateIdentifier"] = record->getAlternateIdentifierUri().c_str();
     recordJson["application"] = ProvenanceAppStr;
     array.append(recordJson);
   }
@@ -103,14 +96,10 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(
   report = writer.write(array);
 }
 
-void SiteToSiteProvenanceReportingTask::onSchedule(
-    core::ProcessContext *context,
-    core::ProcessSessionFactory *sessionFactory) {
+void SiteToSiteProvenanceReportingTask::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
 }
 
-void SiteToSiteProvenanceReportingTask::onTrigger(
-    core::ProcessContext *context, core::ProcessSession *session) {
-
+void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(true);
 
   if (!protocol_) {
@@ -121,18 +110,14 @@ void SiteToSiteProvenanceReportingTask::onTrigger(
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-        context->getProcessorNode().getProcessor());
-    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-                       processor->getYieldPeriodMsec());
+    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor());
+    logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec());
     returnProtocol(std::move(protocol_));
     return;
   }
 
   std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> records;
-  std::shared_ptr<provenance::ProvenanceRepository> repo =
-      std::static_pointer_cast<provenance::ProvenanceRepository>(
-          context->getProvenanceRepository());
+  std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast<provenance::ProvenanceRepository>(context->getProvenanceRepository());
   repo->getProvenanceRecord(records, batch_size_);
   if (records.size() <= 0) {
     returnProtocol(std::move(protocol_));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index 5f62f83..e6d561a 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -40,24 +40,19 @@ void FlowFileRepository::run() {
       leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<
-            FlowFileRecord>(shared_from_this());
+        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
         std::string key = it->key().ToString();
-        if (eventRead->DeSerialize(
-            reinterpret_cast<const uint8_t *>(it->value().data()),
-            it->value().size())) {
+        if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
           if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
             purgeList.push_back(key);
         } else {
-          logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(),
-                             key.c_str());
+          logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str());
           purgeList.push_back(key);
         }
       }
       delete it;
       for (auto eventId : purgeList) {
-        logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
-                          eventId.c_str());
+        logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
         Delete(eventId);
       }
     }
@@ -74,19 +69,14 @@ void FlowFileRepository::loadComponent() {
   leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<FlowFileRecord> eventRead =
-        std::make_shared<FlowFileRecord>(shared_from_this());
+    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
     std::string key = it->key().ToString();
-    if (eventRead->DeSerialize(
-        reinterpret_cast<const uint8_t *>(it->value().data()),
-        it->value().size())) {
+    if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
       auto search = connectionMap.find(eventRead->getConnectionUuid());
       if (search != connectionMap.end()) {
         // we find the connection for the persistent flowfile, create the flowfile and enqueue that
-        std::shared_ptr<core::FlowFile> flow_file_ref =
-            std::static_pointer_cast<core::FlowFile>(eventRead);
-        std::shared_ptr<FlowFileRecord> record =
-            std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
+        std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
+        std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
         // set store to repo to true so that we do need to persistent again in enqueue
         record->setStoredToRepository(true);
         search->second->put(record);
@@ -105,8 +95,7 @@ void FlowFileRepository::loadComponent() {
   std::vector<std::string>::iterator itPurge;
   for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
     std::string eventId = *itPurge;
-    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
-                      eventId.c_str());
+    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str());
     Delete(eventId);
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/repository/VolatileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp
index db036f8..a7e3a51 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -28,8 +28,7 @@ namespace minifi {
 namespace core {
 namespace repository {
 
-const char *VolatileRepository::volatile_repo_max_count =
-    "max.count";
+const char *VolatileRepository::volatile_repo_max_count = "max.count";
 
 void VolatileRepository::run() {
   repo_full_ = false;
@@ -45,9 +44,7 @@ void VolatileRepository::purge() {
       RepoValue value;
       if (ent->getValue(value)) {
         current_size_ -= value.size();
-        logger_->log_info("VolatileRepository -- purge %s %d %d %d",
-                          value.getKey(), current_size_.load(), max_size_,
-                          current_index_.load());
+        logger_->log_info("VolatileRepository -- purge %s %d %d %d", value.getKey(), current_size_.load(), max_size_, current_index_.load());
       }
       if (current_size_ < max_size_)
         break;