You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:07 UTC

[4/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with increased line length to source

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 7bf4f58..a11db2b 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -29,8 +29,7 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
-    YAML::Node rootFlowNode) {
+core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
   uuid_t uuid;
 
   checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
@@ -38,18 +37,15 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
   std::string id = getOrGenerateId(&rootFlowNode);
   uuid_parse(id.c_str(), uuid);
 
-  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id,
-                     flowName);
-  std::unique_ptr<core::ProcessGroup> group =
-      FlowConfiguration::createRootProcessGroup(flowName, uuid);
+  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+  std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid);
 
   this->name_ = flowName;
 
   return group.release();
 }
 
-void YamlConfiguration::parseProcessorNodeYaml(
-    YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
   int64_t schedulingPeriod = -1;
   int64_t penalizationPeriod = -1;
   int64_t yieldPeriod = -1;
@@ -65,8 +61,7 @@ void YamlConfiguration::parseProcessorNodeYaml(
   if (processorsNode) {
     if (processorsNode.IsSequence()) {
       // Evaluate sequence of processors
-      for (YAML::const_iterator iter = processorsNode.begin();
-          iter != processorsNode.end(); ++iter) {
+      for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
         core::ProcessorConfig procCfg;
         YAML::Node procNode = iter->as<YAML::Node>();
 
@@ -74,28 +69,23 @@ void YamlConfiguration::parseProcessorNodeYaml(
         procCfg.name = procNode["name"].as<std::string>();
         procCfg.id = getOrGenerateId(&procNode);
         uuid_parse(procCfg.id.c_str(), uuid);
-        logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
-                           procCfg.name, procCfg.id);
+        logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
         checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
         procCfg.javaClass = procNode["class"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: class => [%s]",
-                           procCfg.javaClass);
+        logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
 
         // Determine the processor name only from the Java class
         int lastOfIdx = procCfg.javaClass.find_last_of(".");
         if (lastOfIdx != std::string::npos) {
           lastOfIdx++;  // if a value is found, increment to move beyond the .
           int nameLength = procCfg.javaClass.length() - lastOfIdx;
-          std::string processorName = procCfg.javaClass.substr(lastOfIdx,
-                                                               nameLength);
+          std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
           processor = this->createProcessor(processorName, uuid);
         }
 
         if (!processor) {
-          logger_->log_error("Could not create a processor %s with id %s",
-                             procCfg.name, procCfg.id);
-          throw std::invalid_argument(
-              "Could not create processor " + procCfg.name);
+          logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
+          throw std::invalid_argument("Could not create processor " + procCfg.name);
         }
         processor->setName(procCfg.name);
 
@@ -131,11 +121,8 @@ void YamlConfiguration::parseProcessorNodeYaml(
         if (procNode["auto-terminated relationships list"]) {
           YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
           std::vector<std::string> rawAutoTerminatedRelationshipValues;
-          if (autoTerminatedSequence.IsSequence()
-              && !autoTerminatedSequence.IsNull()
-              && autoTerminatedSequence.size() > 0) {
-            for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
-                 relIter != autoTerminatedSequence.end(); ++relIter) {
+          if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
+            for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
               std::string autoTerminatedRel = relIter->as<std::string>();
               rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
             }
@@ -151,20 +138,17 @@ void YamlConfiguration::parseProcessorNodeYaml(
 
         // Take care of scheduling
         core::TimeUnit unit;
-        if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
-            && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+        if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
           logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
           processor->setSchedulingPeriodNano(schedulingPeriod);
         }
 
-        if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
-            && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+        if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
           logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
           processor->setPenalizationPeriodMsec(penalizationPeriod);
         }
 
-        if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
-            && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+        if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
           logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
           processor->setYieldPeriodMsec(yieldPeriod);
         }
@@ -174,16 +158,13 @@ void YamlConfiguration::parseProcessorNodeYaml(
 
         if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
           processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s",
-                             procCfg.schedulingStrategy);
+          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
         } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
           processor->setSchedulingStrategy(core::EVENT_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s",
-                             procCfg.schedulingStrategy);
+          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
         } else {
           processor->setSchedulingStrategy(core::CRON_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s",
-                             procCfg.schedulingStrategy);
+          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
         }
 
         int64_t maxConcurrentTasks;
@@ -209,18 +190,15 @@ void YamlConfiguration::parseProcessorNodeYaml(
         parentGroup->addProcessor(processor);
       }
     } else {
-      throw new std::invalid_argument(
-          "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+      throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
     }
   } else {
-    throw new std::invalid_argument(
-        "Cannot instantiate a MiNiFi instance without a defined "
-        "Processors configuration node.");
+    throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined "
+                                    "Processors configuration node.");
   }
 }
 
-void YamlConfiguration::parseRemoteProcessGroupYaml(
-    YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
   uuid_t uuid;
   std::string id;
 
@@ -231,8 +209,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
 
   if (rpgNode) {
     if (rpgNode->IsSequence()) {
-      for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end();
-          ++iter) {
+      for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
         YAML::Node currRpgNode = iter->as<YAML::Node>();
 
         checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
@@ -258,12 +235,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
           std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
           logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
 
-          if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
-              && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
-                                                     yieldPeriodValue) && group) {
-            logger_->log_debug(
-                "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
-                yieldPeriodValue);
+          if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+            logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
             group->setYieldPeriodMsec(yieldPeriodValue);
           }
         }
@@ -272,12 +245,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
           std::string timeout = currRpgNode["timeout"].as<std::string>();
           logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
 
-          if (core::Property::StringToTime(timeout, timeoutValue, unit)
-              && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
-                                                     timeoutValue) && group) {
-            logger_->log_debug(
-                "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
-                timeoutValue);
+          if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+            logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
             group->setTimeOut(timeoutValue);
           }
         }
@@ -288,8 +257,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
         checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
         if (inputPorts && inputPorts.IsSequence()) {
-          for (YAML::const_iterator portIter = inputPorts.begin();
-              portIter != inputPorts.end(); ++portIter) {
+          for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
             logger_->log_debug("Got a current port, iterating...");
 
             YAML::Node currPort = portIter->as<YAML::Node>();
@@ -299,8 +267,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
         }
         YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
         if (outputPorts && outputPorts.IsSequence()) {
-          for (YAML::const_iterator portIter = outputPorts.begin();
-              portIter != outputPorts.end(); ++portIter) {
+          for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
             logger_->log_debug("Got a current port, iterating...");
 
             YAML::Node currPort = portIter->as<YAML::Node>();
@@ -313,8 +280,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
   }
 }
 
-void YamlConfiguration::parseProvenanceReportingYaml(
-    YAML::Node *reportNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup) {
   uuid_t port_uuid;
   int64_t schedulingPeriod = -1;
 
@@ -330,9 +296,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(
 
   std::shared_ptr<core::Processor> processor = nullptr;
   processor = createProvenanceReportTask();
-  std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask =
-      std::static_pointer_cast<
-          core::reporting::SiteToSiteProvenanceReportingTask>(processor);
+  std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor);
 
   YAML::Node node = reportNode->as<YAML::Node>();
 
@@ -354,21 +318,16 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   processor->setScheduledState(core::RUNNING);
 
   core::TimeUnit unit;
-  if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit)
-      && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
-                                             schedulingPeriod)) {
-    logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns",
-                       schedulingPeriod);
+  if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+    logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns", schedulingPeriod);
     processor->setSchedulingPeriodNano(schedulingPeriod);
   }
 
   if (schedulingStrategyStr == "TIMER_DRIVEN") {
     processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-    logger_->log_debug("ProvenanceReportingTask scheduling strategy %s",
-                       schedulingStrategyStr);
+    logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr);
   } else {
-    throw std::invalid_argument(
-        "Invalid scheduling strategy " + schedulingStrategyStr);
+    throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr);
   }
 
   reportTask->setHost(hostStr);
@@ -387,19 +346,18 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   }
 }
 
-void YamlConfiguration::parseControllerServices(
-    YAML::Node *controllerServicesNode) {
+void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNode) {
   if (!IsNullOrEmpty(controllerServicesNode)) {
     if (controllerServicesNode->IsSequence()) {
       for (auto iter : *controllerServicesNode) {
         YAML::Node controllerServiceNode = iter.as<YAML::Node>();
         try {
           checkRequiredField(&controllerServiceNode, "name",
-                             CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+          CONFIG_YAML_CONTROLLER_SERVICES_KEY);
           checkRequiredField(&controllerServiceNode, "id",
-                             CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+          CONFIG_YAML_CONTROLLER_SERVICES_KEY);
           checkRequiredField(&controllerServiceNode, "class",
-                             CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+          CONFIG_YAML_CONTROLLER_SERVICES_KEY);
 
           auto name = controllerServiceNode["name"].as<std::string>();
           auto id = controllerServiceNode["id"].as<std::string>();
@@ -407,42 +365,28 @@ void YamlConfiguration::parseControllerServices(
 
           uuid_t uuid;
           uuid_parse(id.c_str(), uuid);
-          auto controller_service_node = createControllerService(type, name,
-                                                                 uuid);
+          auto controller_service_node = createControllerService(type, name, uuid);
           if (nullptr != controller_service_node) {
-            logger_->log_debug(
-                "Created Controller Service with UUID %s and name %s", id,
-                name);
+            logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
             controller_service_node->initialize();
             YAML::Node propertiesNode = controllerServiceNode["Properties"];
             // we should propogate propertiets to the node and to the implementation
-            parsePropertiesNodeYaml(
-                &propertiesNode,
-                std::static_pointer_cast<core::ConfigurableComponent>(
-                    controller_service_node));
-            if (controller_service_node->getControllerServiceImplementation()
-                != nullptr) {
-              parsePropertiesNodeYaml(
-                  &propertiesNode,
-                  std::static_pointer_cast<core::ConfigurableComponent>(
-                      controller_service_node
-                          ->getControllerServiceImplementation()));
+            parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node));
+            if (controller_service_node->getControllerServiceImplementation() != nullptr) {
+              parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()));
             }
           }
           controller_services_->put(id, controller_service_node);
           controller_services_->put(name, controller_service_node);
         } catch (YAML::InvalidNode &in) {
-          throw Exception(
-              ExceptionType::GENERAL_EXCEPTION,
-              "Name, id, and class must be specified for controller services");
+          throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
         }
       }
     }
   }
 }
 
-void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
-                                            core::ProcessGroup *parent) {
+void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::ProcessGroup *parent) {
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
@@ -450,8 +394,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
 
   if (connectionsNode) {
     if (connectionsNode->IsSequence()) {
-      for (YAML::const_iterator iter = connectionsNode->begin();
-          iter != connectionsNode->end(); ++iter) {
+      for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
         YAML::Node connectionNode = iter->as<YAML::Node>();
         std::shared_ptr<minifi::Connection> connection = nullptr;
 
@@ -462,15 +405,13 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
         std::string id = getOrGenerateId(&connectionNode);
         uuid_parse(id.c_str(), uuid);
         connection = this->createConnection(name, uuid);
-        logger_->log_debug("Created connection with UUID %s and name %s", id,
-                           name);
+        logger_->log_debug("Created connection with UUID %s and name %s", id, name);
 
         // Configure connection source
         checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
         auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
         core::Relationship relationship(rawRelationship, "");
-        logger_->log_debug("parseConnection: relationship => [%s]",
-                           rawRelationship);
+        logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
         if (connection) {
           connection->setRelationship(relationship);
         }
@@ -481,7 +422,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
           std::string connectionSrcProcId = connectionNode["source id"].as<std::string>();
           uuid_parse(connectionSrcProcId.c_str(), srcUUID);
           logger_->log_debug("Using 'source id' to match source with same id for "
-                                 "connection '%s': source id => [%s]", name, connectionSrcProcId);
+                             "connection '%s': source id => [%s]",
+                             name, connectionSrcProcId);
         } else {
           // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
           checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY);
@@ -491,20 +433,20 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
             // the source name is a remote port id, so use that as the source id
             uuid_copy(srcUUID, tmpUUID);
             logger_->log_debug("Using 'source name' containing a remote port id to match the source for "
-                                   "connection '%s': source name => [%s]", name, connectionSrcProcName);
+                               "connection '%s': source name => [%s]",
+                               name, connectionSrcProcName);
           } else {
             // lastly, look the processor up by name
             auto srcProcessor = parent->findProcessor(connectionSrcProcName);
             if (NULL != srcProcessor) {
               srcProcessor->getUUID(srcUUID);
               logger_->log_debug("Using 'source name' to match source with same name for "
-                                     "connection '%s': source name => [%s]", name, connectionSrcProcName);
+                                 "connection '%s': source name => [%s]",
+                                 name, connectionSrcProcName);
             } else {
               // we ran out of ways to discover the source processor
-              logger_->log_error(
-                  "Could not locate a source with name %s to create a connection", connectionSrcProcName);
-              throw std::invalid_argument(
-                  "Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
+              logger_->log_error("Could not locate a source with name %s to create a connection", connectionSrcProcName);
+              throw std::invalid_argument("Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
             }
           }
         }
@@ -516,7 +458,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
           std::string connectionDestProcId = connectionNode["destination id"].as<std::string>();
           uuid_parse(connectionDestProcId.c_str(), destUUID);
           logger_->log_debug("Using 'destination id' to match destination with same id for "
-                                 "connection '%s': destination id => [%s]", name, connectionDestProcId);
+                             "connection '%s': destination id => [%s]",
+                             name, connectionDestProcId);
         } else {
           // we use the same logic as above for resolving the source processor
           // for looking up the destination processor in absence of a processor id
@@ -524,24 +467,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
           std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
           uuid_t tmpUUID;
           if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
-              NULL != parent->findProcessor(tmpUUID)) {
+          NULL != parent->findProcessor(tmpUUID)) {
             // the destination name is a remote port id, so use that as the dest id
             uuid_copy(destUUID, tmpUUID);
             logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for "
-                                   "connection '%s': destination name => [%s]", name, connectionDestProcName);
+                               "connection '%s': destination name => [%s]",
+                               name, connectionDestProcName);
           } else {
             // look the processor up by name
             auto destProcessor = parent->findProcessor(connectionDestProcName);
             if (NULL != destProcessor) {
               destProcessor->getUUID(destUUID);
               logger_->log_debug("Using 'destination name' to match destination with same name for "
-                                     "connection '%s': destination name => [%s]", name, connectionDestProcName);
+                                 "connection '%s': destination name => [%s]",
+                                 name, connectionDestProcName);
             } else {
               // we ran out of ways to discover the destination processor
-              logger_->log_error(
-                  "Could not locate a destination with name %s to create a connection", connectionDestProcName);
-              throw std::invalid_argument(
-                  "Could not locate a destination with name " + connectionDestProcName + " to create a connection");
+              logger_->log_error("Could not locate a destination with name %s to create a connection", connectionDestProcName);
+              throw std::invalid_argument("Could not locate a destination with name " + connectionDestProcName + " to create a connection");
             }
           }
         }
@@ -555,9 +498,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
   }
 }
 
-void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
-                                      core::ProcessGroup *parent,
-                                      TransferDirection direction) {
+void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, TransferDirection direction) {
   uuid_t uuid;
   std::shared_ptr<core::Processor> processor = NULL;
   std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
@@ -572,16 +513,13 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   // Check for required fields
   checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   auto nameStr = inputPortsObj["name"].as<std::string>();
-  checkRequiredField(
-      &inputPortsObj,
-      "id",
-      CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
-      "The field 'id' is required for "
-          "the port named '" + nameStr
-          + "' in the YAML Config. If this port "
-              "is an input port for a NiFi Remote Process Group, the port "
-              "id should match the corresponding id specified in the NiFi configuration. "
-              "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
+  checkRequiredField(&inputPortsObj, "id",
+  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
+                     "The field 'id' is required for "
+                         "the port named '" + nameStr + "' in the YAML Config. If this port "
+                         "is an input port for a NiFi Remote Process Group, the port "
+                         "id should match the corresponding id specified in the NiFi configuration. "
+                         "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
   auto portId = inputPortsObj["id"].as<std::string>();
   uuid_parse(portId.c_str(), uuid);
 
@@ -597,9 +535,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   // handle port properties
   YAML::Node nodeVal = portNode->as<YAML::Node>();
   YAML::Node propertiesNode = nodeVal["Properties"];
-  parsePropertiesNodeYaml(
-      &propertiesNode,
-      std::static_pointer_cast<core::ConfigurableComponent>(processor));
+  parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor));
 
   // add processor to parent
   parent->addProcessor(processor);
@@ -616,12 +552,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   }
 }
 
-void YamlConfiguration::parsePropertiesNodeYaml(
-    YAML::Node *propertiesNode,
-    std::shared_ptr<core::ConfigurableComponent> processor) {
+void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor) {
   // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
-  for (YAML::const_iterator propsIter = propertiesNode->begin();
-      propsIter != propertiesNode->end(); ++propsIter) {
+  for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) {
     std::string propertyName = propsIter->first.as<std::string>();
     YAML::Node propertyValueNode = propsIter->second;
     if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
@@ -634,12 +567,9 @@ void YamlConfiguration::parsePropertiesNodeYaml(
             std::string rawValueString = propertiesNode.as<std::string>();
             logger_->log_info("Found %s=%s", propertyName, rawValueString);
             if (!processor->updateProperty(propertyName, rawValueString)) {
-              std::shared_ptr<core::Connectable> proc =
-                  std::dynamic_pointer_cast<core::Connectable>(processor);
+              std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
               if (proc != 0) {
-                logger_->log_warn(
-                    "Received property %s with value %s but is not one of the properties for %s",
-                    propertyName, rawValueString, proc->getName());
+                logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName());
               }
             }
           }
@@ -647,12 +577,9 @@ void YamlConfiguration::parsePropertiesNodeYaml(
       } else {
         std::string rawValueString = propertyValueNode.as<std::string>();
         if (!processor->setProperty(propertyName, rawValueString)) {
-          std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<
-              core::Connectable>(processor);
+          std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
           if (proc != 0) {
-            logger_->log_warn(
-                "Received property %s with value %s but is not one of the properties for %s",
-                propertyName, rawValueString, proc->getName());
+            logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName());
           }
         }
       }
@@ -660,8 +587,7 @@ void YamlConfiguration::parsePropertiesNodeYaml(
   }
 }
 
-std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
-                                               const std::string &idField) {
+std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::string &idField) {
   std::string id;
   YAML::Node node = yamlNode->as<YAML::Node>();
 
@@ -669,9 +595,8 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
     if (YAML::NodeType::Scalar == node[idField].Type()) {
       id = node[idField].as<std::string>();
     } else {
-      throw std::invalid_argument(
-          "getOrGenerateId: idField is expected to reference YAML::Node "
-          "of YAML::NodeType::Scalar.");
+      throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node "
+                                  "of YAML::NodeType::Scalar.");
     }
   } else {
     uuid_t uuid;
@@ -684,10 +609,7 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
   return id;
 }
 
-void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
-                                           const std::string &fieldName,
-                                           const std::string &yamlSection,
-                                           const std::string &errorMessage) {
+void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, const std::string &yamlSection, const std::string &errorMessage) {
   std::string errMsg = errorMessage;
   if (!yamlNode->as<YAML::Node>()[fieldName]) {
     if (errMsg.empty()) {
@@ -695,11 +617,8 @@ void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
       // invalid YAML config file, using the component name if present
       errMsg =
           yamlNode->as<YAML::Node>()["name"] ?
-              "Unable to parse configuration file for component named '"
-                  + yamlNode->as<YAML::Node>()["name"].as<std::string>()
-                  + "' as required field '" + fieldName + "' is missing" :
-              "Unable to parse configuration file as required field '"
-                  + fieldName + "' is missing";
+              "Unable to parse configuration file for component named '" + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '" + fieldName + "' is missing" :
+              "Unable to parse configuration file as required field '" + fieldName + "' is missing";
       if (!yamlSection.empty()) {
         errMsg += " [in '" + yamlSection + "' section of configuration file]";
       }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index 307f41d..137bf1a 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -33,9 +33,7 @@ namespace io {
  * @return resulting write size
  **/
 int BaseStream::write(uint32_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value,
-                             reinterpret_cast<DataStream*>(composable_stream_),
-                             is_little_endian);
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
 }
 
 int BaseStream::writeData(uint8_t *value, int size) {
@@ -54,9 +52,7 @@ int BaseStream::writeData(uint8_t *value, int size) {
  * @return resulting write size
  **/
 int BaseStream::write(uint16_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value,
-                             reinterpret_cast<DataStream*>(composable_stream_),
-                             is_little_endian);
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
 }
 
 /**
@@ -67,8 +63,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint8_t *value, int len) {
-  return Serializable::write(value, len,
-                             reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::write(value, len, reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -79,9 +74,7 @@ int BaseStream::write(uint8_t *value, int len) {
  * @return resulting write size
  **/
 int BaseStream::write(uint64_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value,
-                             reinterpret_cast<DataStream*>(composable_stream_),
-                             is_little_endian);
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
 }
 
 /**
@@ -100,8 +93,7 @@ int BaseStream::write(bool value) {
  * @return resulting write size
  **/
 int BaseStream::writeUTF(std::string str, bool widen) {
-  return Serializable::writeUTF(
-      str, reinterpret_cast<DataStream*>(composable_stream_), widen);
+  return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen);
 }
 
 /**
@@ -111,8 +103,7 @@ int BaseStream::writeUTF(std::string str, bool widen) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t &value) {
-  return Serializable::read(value,
-                            reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -122,8 +113,7 @@ int BaseStream::read(uint8_t &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
-  return Serializable::read(base_value,
-                            reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::read(base_value, reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -133,8 +123,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(char &value) {
-  return Serializable::read(value,
-                            reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -145,8 +134,7 @@ int BaseStream::read(char &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t *value, int len) {
-  return Serializable::read(value, len,
-                            reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::read(value, len, reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -155,8 +143,7 @@ int BaseStream::read(uint8_t *value, int len) {
  * @param buflen
  */
 int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) {
-  return Serializable::read(&buf[0], buflen,
-                            reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::read(&buf[0], buflen, reinterpret_cast<DataStream*>(composable_stream_));
 }
 /**
  * Reads data and places it into buf
@@ -164,8 +151,7 @@ int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) {
  * @param buflen
  */
 int BaseStream::readData(uint8_t *buf, int buflen) {
-  return Serializable::read(buf, buflen,
-                            reinterpret_cast<DataStream*>(composable_stream_));
+  return Serializable::read(buf, buflen, reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -175,9 +161,7 @@ int BaseStream::readData(uint8_t *buf, int buflen) {
  * @return resulting read size
  **/
 int BaseStream::read(uint32_t &value, bool is_little_endian) {
-  return Serializable::read(value,
-                            reinterpret_cast<DataStream*>(composable_stream_),
-                            is_little_endian);
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
 }
 
 /**
@@ -187,9 +171,7 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(uint64_t &value, bool is_little_endian) {
-  return Serializable::read(value,
-                            reinterpret_cast<DataStream*>(composable_stream_),
-                            is_little_endian);
+  return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
 }
 
 /**
@@ -199,8 +181,7 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::readUTF(std::string &str, bool widen) {
-  return Serializable::readUTF(
-      str, reinterpret_cast<DataStream*>(composable_stream_), widen);
+  return Serializable::readUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen);
 }
 } /* namespace io */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index bd99bc7..57d6f03 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -39,8 +39,7 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port,
-               const uint16_t listeners = -1)
+Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
     : requested_hostname_(hostname),
       port_(port),
       addr_info_(0),
@@ -86,8 +85,7 @@ void Socket::closeStream() {
 }
 
 int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
-  if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype,
-                                        p->ai_protocol)) == -1) {
+  if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
     logger_->log_error("error while connecting to server socket");
     return -1;
   }
@@ -111,8 +109,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
       sa_loc->sin_port = htons(port_);
       // use any address if you are connecting to the local machine for testing
       // otherwise we must use the requested hostname
-      if (IsNullOrEmpty(requested_hostname_)
-          || requested_hostname_ == "localhost") {
+      if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") {
         sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
       } else {
         sa_loc->sin_addr.s_addr = addr;
@@ -149,12 +146,10 @@ int16_t Socket::initialize() {
     hints.ai_flags |= AI_PASSIVE;
   hints.ai_protocol = 0; /* any protocol */
 
-  int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints,
-                            &addr_info_);
+  int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_);
 
   if (errcode != 0) {
-    logger_->log_error("Saw error during getaddrinfo, error: %s",
-                       strerror(errno));
+    logger_->log_error("Saw error during getaddrinfo, error: %s", strerror(errno));
     return -1;
   }
 
@@ -210,8 +205,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
     retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
 
   if (retval < 0) {
-    logger_->log_error("Saw error during selection, error:%i %s", retval,
-                       strerror(errno));
+    logger_->log_error("Saw error during selection, error:%i %s", retval, strerror(errno));
     return retval;
   }
 
@@ -221,8 +215,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
         if (listeners_ > 0) {
           struct sockaddr_storage remoteaddr;  // client address
           socklen_t addrlen = sizeof remoteaddr;
-          int newfd = accept(socket_file_descriptor_,
-                             (struct sockaddr *) &remoteaddr, &addrlen);
+          int newfd = accept(socket_file_descriptor_, (struct sockaddr *) &remoteaddr, &addrlen);
           FD_SET(newfd, &total_list_);  // add to master set
           if (newfd > socket_max_) {    // keep track of the max
             socket_max_ = newfd;
@@ -273,8 +266,7 @@ int16_t Socket::setSocketOptions(const int sock) {
 #else
   if (listeners_ > 0) {
     // lose the pesky "address already in use" error message
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                   reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) {
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() SO_REUSEADDR failed");
       close(sock);
       return -1;
@@ -304,16 +296,14 @@ int Socket::writeData(uint8_t *value, int size) {
     // check for errors
     if (ret <= 0) {
       close(socket_file_descriptor_);
-      logger_->log_error("Could not send to %d, error: %s",
-                         socket_file_descriptor_, strerror(errno));
+      logger_->log_error("Could not send to %d, error: %s", socket_file_descriptor_, strerror(errno));
       return ret;
     }
     bytes += ret;
   }
 
   if (ret)
-    logger_->log_trace("Send data size %d over socket %d", size,
-                       socket_file_descriptor_);
+    logger_->log_trace("Send data size %d over socket %d", size, socket_file_descriptor_);
   return bytes;
 }
 
@@ -341,15 +331,11 @@ int Socket::read(uint64_t &value, bool is_little_endian) {
   auto buf = readBuffer(value);
 
   if (is_little_endian) {
-    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
-        | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
-        | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
-        | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
   } else {
-    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
-        | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
-        | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
-        | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
   }
   return sizeof(value);
 }
@@ -397,8 +383,7 @@ int Socket::readData(uint8_t *buf, int buflen) {
       if (bytes_read == 0) {
         logger_->log_info("Other side hung up on %d", fd);
       } else {
-        logger_->log_error("Could not recv on %d, error: %s", fd,
-                           strerror(errno));
+        logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno));
       }
       return -1;
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 9e0dfce..92c7cda 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -44,15 +44,11 @@ int DataStream::read(uint64_t &value, bool is_little_endian) {
   uint8_t *buf = &buffer[readBuffer];
 
   if (is_little_endian) {
-    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
-        | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
-        | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
-        | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
   } else {
-    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
-        | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
-        | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
-        | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
   }
   readBuffer += 8;
   return 8;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index c3f74c7..5e57c80 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -35,26 +35,20 @@ namespace io {
 template<typename T>
 int Serializable::writeData(const T &t, DataStream *stream) {
   uint8_t bytes[sizeof t];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
-            static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
-            bytes);
+  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
   return stream->writeData(bytes, sizeof t);
 }
 
 template<typename T>
 int Serializable::writeData(const T &t, uint8_t *to_vec) {
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
-            static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
-            to_vec);
+  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec);
   return sizeof t;
 }
 
 template<typename T>
 int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
   uint8_t bytes[sizeof t];
-  std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
-            static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
-            bytes);
+  std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
   to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
   return sizeof t;
 }
@@ -97,36 +91,29 @@ int Serializable::read(uint8_t *value, int len, DataStream *stream) {
   return stream->readData(value, len);
 }
 
-int Serializable::read(uint16_t &value, DataStream *stream,
-                       bool is_little_endian) {
+int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) {
   return stream->read(value, is_little_endian);
 }
 
-int Serializable::read(uint32_t &value, DataStream *stream,
-                       bool is_little_endian) {
+int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) {
   return stream->read(value, is_little_endian);
 }
-int Serializable::read(uint64_t &value, DataStream *stream,
-                       bool is_little_endian) {
+int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) {
   return stream->read(value, is_little_endian);
 }
 
-int Serializable::write(uint32_t base_value, DataStream *stream,
-                        bool is_little_endian) {
+int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) {
   const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
 
   return writeData(value, stream);
 }
 
-int Serializable::write(uint64_t base_value, DataStream *stream,
-                        bool is_little_endian) {
-  const uint64_t value =
-      is_little_endian == 1 ? htonll_r(base_value) : base_value;
+int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) {
+  const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value;
   return writeData(value, stream);
 }
 
-int Serializable::write(uint16_t base_value, DataStream *stream,
-                        bool is_little_endian) {
+int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) {
   const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
 
   return writeData(value, stream);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index b269cef..40cf1e9 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -57,9 +57,7 @@ int16_t TLSContext::initialize() {
 
   std::string clientAuthStr;
   bool needClientCert = true;
-  if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr)
-      && org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-          clientAuthStr, needClientCert))) {
+  if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) {
     needClientCert = true;
   }
 
@@ -67,8 +65,7 @@ int16_t TLSContext::initialize() {
   method = TLSv1_2_client_method();
   ctx = SSL_CTX_new(method);
   if (ctx == NULL) {
-    logger_->log_error("Could not create SSL context, error: %s.",
-                       std::strerror(errno));
+    logger_->log_error("Could not create SSL context, error: %s.", std::strerror(errno));
     error_value = TLS_ERROR_CONTEXT;
     return error_value;
   }
@@ -78,56 +75,40 @@ int16_t TLSContext::initialize() {
     std::string passphrase;
     std::string caCertificate;
 
-    if (!(configure_->get(Configure::nifi_security_client_certificate,
-                          certificate)
-        && configure_->get(Configure::nifi_security_client_private_key,
-                           privatekey))) {
-      logger_->log_error(
-          "Certificate and Private Key PEM file not configured, error: %s.",
-          std::strerror(errno));
+    if (!(configure_->get(Configure::nifi_security_client_certificate, certificate) && configure_->get(Configure::nifi_security_client_private_key, privatekey))) {
+      logger_->log_error("Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno));
       error_value = TLS_ERROR_PEM_MISSING;
       return error_value;
     }
     // load certificates and private key in PEM format
-    if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
-        <= 0) {
-      logger_->log_error("Could not create load certificate, error : %s",
-                         std::strerror(errno));
+    if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
+      logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
       error_value = TLS_ERROR_CERT_MISSING;
       return error_value;
     }
-    if (configure_->get(Configure::nifi_security_client_pass_phrase,
-                        passphrase)) {
+    if (configure_->get(Configure::nifi_security_client_pass_phrase, passphrase)) {
       // if the private key has passphase
       SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
-      SSL_CTX_set_default_passwd_cb_userdata(
-          ctx, static_cast<void*>(configure_.get()));
+      SSL_CTX_set_default_passwd_cb_userdata(ctx, static_cast<void*>(configure_.get()));
     }
 
-    int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
-                                           SSL_FILETYPE_PEM);
+    int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), SSL_FILETYPE_PEM);
     if (retp != 1) {
-      logger_->log_error(
-          "Could not create load private key,%i on %s error : %s", retp,
-          privatekey.c_str(), std::strerror(errno));
+      logger_->log_error("Could not create load private key,%i on %s error : %s", retp, privatekey.c_str(), std::strerror(errno));
       error_value = TLS_ERROR_KEY_ERROR;
       return error_value;
     }
     // verify private key
     if (!SSL_CTX_check_private_key(ctx)) {
-      logger_->log_error(
-          "Private key does not match the public certificate, error : %s",
-          std::strerror(errno));
+      logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
       error_value = TLS_ERROR_KEY_ERROR;
       return error_value;
     }
     // load CA certificates
-    if (configure_->get(Configure::nifi_security_client_ca_certificate,
-                        caCertificate)) {
+    if (configure_->get(Configure::nifi_security_client_ca_certificate, caCertificate)) {
       retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
       if (retp == 0) {
-        logger_->log_error("Can not load CA certificate, Exiting, error : %s",
-                           std::strerror(errno));
+        logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
         error_value = TLS_ERROR_CERT_ERROR;
         return error_value;
       }
@@ -149,24 +130,24 @@ TLSSocket::~TLSSocket() {
  * @param port connecting port
  * @param listeners number of listeners in the queue
  */
-TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context,
-                     const std::string &hostname, const uint16_t port,
-                     const uint16_t listeners)
+TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners)
     : Socket(context, hostname, port, listeners),
-      ssl(0), logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
+      ssl(0),
+      logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
   context_ = context;
 }
 
-TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context,
-                     const std::string &hostname, const uint16_t port)
+TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port)
     : Socket(context, hostname, port, 0),
-      ssl(0), logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
+      ssl(0),
+      logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
   context_ = context;
 }
 
 TLSSocket::TLSSocket(const TLSSocket &&d)
     : Socket(std::move(d)),
-      ssl(0), logger_(std::move(d.logger_)) {
+      ssl(0),
+      logger_(std::move(d.logger_)) {
   context_ = d.context_;
 }
 
@@ -178,15 +159,13 @@ int16_t TLSSocket::initialize() {
     ssl = SSL_new(context_->getContext());
     SSL_set_fd(ssl, socket_file_descriptor_);
     if (SSL_connect(ssl) == -1) {
-      logger_->log_error("SSL socket connect failed to %s %d",
-                         requested_hostname_.c_str(), port_);
+      logger_->log_error("SSL socket connect failed to %s %d", requested_hostname_.c_str(), port_);
       SSL_free(ssl);
       ssl = NULL;
       close(socket_file_descriptor_);
       return -1;
     } else {
-      logger_->log_info("SSL socket connect success to %s %d",
-                        requested_hostname_.c_str(), port_);
+      logger_->log_info("SSL socket connect success to %s %d", requested_hostname_.c_str(), port_);
       return 0;
     }
   }
@@ -213,8 +192,7 @@ int TLSSocket::writeData(uint8_t *value, int size) {
     sent = SSL_write(ssl, value + bytes, size - bytes);
     // check for errors
     if (sent < 0) {
-      logger_->log_error("Site2Site Peer socket %d send failed %s",
-                         socket_file_descriptor_, strerror(errno));
+      logger_->log_error("Site2Site Peer socket %d send failed %s", socket_file_descriptor_, strerror(errno));
       return sent;
     }
     bytes += sent;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp
index b3c76db..bbc86a5 100644
--- a/libminifi/src/processors/AppendHostInfo.cpp
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -46,19 +46,10 @@ namespace processors {
 #define HOST_NAME_MAX 255
 #endif
 
-core::Property AppendHostInfo::InterfaceName(
-    "Network Interface Name",
-    "Network interface from which to read an IP v4 address", "eth0");
-core::Property AppendHostInfo::HostAttribute(
-    "Hostname Attribute",
-    "Flowfile attribute to used to record the agent's hostname",
-    "source.hostname");
-core::Property AppendHostInfo::IPAttribute(
-    "IP Attribute",
-    "Flowfile attribute to used to record the agent's IP address",
-    "source.ipv4");
-core::Relationship AppendHostInfo::Success(
-    "success", "success operational on the flow record");
+core::Property AppendHostInfo::InterfaceName("Network Interface Name", "Network interface from which to read an IP v4 address", "eth0");
+core::Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile attribute to used to record the agent's hostname", "source.hostname");
+core::Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute to used to record the agent's IP address", "source.ipv4");
+core::Relationship AppendHostInfo::Success("success", "success operational on the flow record");
 
 void AppendHostInfo::initialize() {
   // Set the supported properties
@@ -74,8 +65,7 @@ void AppendHostInfo::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void AppendHostInfo::onTrigger(core::ProcessContext *context,
-                               core::ProcessSession *session) {
+void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   std::shared_ptr<core::FlowFile> flow = session->get();
   if (!flow)
     return;
@@ -84,8 +74,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context,
 
   std::string hostAttribute = "";
   context->getProperty(HostAttribute.getName(), hostAttribute);
-  flow->addAttribute(hostAttribute.c_str(),
-                     org::apache::nifi::minifi::io::Socket::getMyHostName());
+  flow->addAttribute(hostAttribute.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName());
 
   // Get IP address for the specified interface
   std::string iface;
@@ -103,9 +92,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context,
 
     std::string ipAttribute;
     context->getProperty(IPAttribute.getName(), ipAttribute);
-    flow->addAttribute(
-        ipAttribute.c_str(),
-        inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr));
+    flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr));
   }
 
   // Transfer to the relationship

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 701c645..323d69a 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -33,31 +33,18 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ExecuteProcess::Command(
-    "Command",
-    "Specifies the command to be executed; if just the name of an executable"
-    " is provided, it must be in the user's environment PATH.",
-    "");
-core::Property ExecuteProcess::CommandArguments(
-    "Command Arguments",
-    "The arguments to supply to the executable delimited by white space. White "
-    "space can be escaped by enclosing it in double-quotes.",
-    "");
-core::Property ExecuteProcess::WorkingDir(
-    "Working Directory",
-    "The directory to use as the current working directory when executing the command",
-    "");
-core::Property ExecuteProcess::BatchDuration(
-    "Batch Duration",
-    "If the process is expected to be long-running and produce textual output, a "
-    "batch duration can be specified.",
-    "0");
-core::Property ExecuteProcess::RedirectErrorStream(
-    "Redirect Error Stream",
-    "If true will redirect any error stream output of the process to the output stream.",
-    "false");
-core::Relationship ExecuteProcess::Success(
-    "success", "All created FlowFiles are routed to this relationship.");
+core::Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable"
+                                       " is provided, it must be in the user's environment PATH.",
+                                       "");
+core::Property ExecuteProcess::CommandArguments("Command Arguments", "The arguments to supply to the executable delimited by white space. White "
+                                                "space can be escaped by enclosing it in double-quotes.",
+                                                "");
+core::Property ExecuteProcess::WorkingDir("Working Directory", "The directory to use as the current working directory when executing the command", "");
+core::Property ExecuteProcess::BatchDuration("Batch Duration", "If the process is expected to be long-running and produce textual output, a "
+                                             "batch duration can be specified.",
+                                             "0");
+core::Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream", "If true will redirect any error stream output of the process to the output stream.", "false");
+core::Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship.");
 
 void ExecuteProcess::initialize() {
   // Set the supported properties
@@ -74,8 +61,7 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context,
-                               core::ProcessSession *session) {
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   std::string value;
   if (context->getProperty(Command.getName(), value)) {
     this->_command = value;
@@ -88,15 +74,12 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
   }
   if (context->getProperty(BatchDuration.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, _batchDuration, unit)
-        && core::Property::ConvertTimeUnitToMS(_batchDuration, unit,
-                                               _batchDuration)) {
+    if (core::Property::StringToTime(value, _batchDuration, unit) && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) {
       logger_->log_info("Setting _batchDuration");
     }
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-        value, _redirectErrorStream);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _redirectErrorStream);
   }
   this->_fullCommand = _command + " " + _commandArgument;
   if (_fullCommand.length() == 0) {
@@ -106,8 +89,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
   if (_workingDir.length() > 0 && _workingDir != ".") {
     // change to working directory
     if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s",
-                         _workingDir.c_str());
+      logger_->log_error("Execute Command can not chdir %s", _workingDir.c_str());
       yield();
       return;
     }
@@ -156,21 +138,18 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
         close(_pipefd[1]);
         if (_batchDuration > 0) {
           while (1) {
-            std::this_thread::sleep_for(
-                std::chrono::milliseconds(_batchDuration));
+            std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
             char buffer[4096];
             int numRead = read(_pipefd[0], buffer, sizeof(buffer));
             if (numRead <= 0)
               break;
             logger_->log_info("Execute Command Respond %d", numRead);
             ExecuteProcess::WriteCallback callback(buffer, numRead);
-            std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-                FlowFileRecord>(session->create());
+            std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
             if (!flowFile)
               continue;
             flowFile->addAttribute("command", _command.c_str());
-            flowFile->addAttribute("command.arguments",
-                                   _commandArgument.c_str());
+            flowFile->addAttribute("command.arguments", _commandArgument.c_str());
             session->write(flowFile, &callback);
             session->transfer(flowFile, Success);
             session->commit();
@@ -181,21 +160,18 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
           int totalRead = 0;
           std::shared_ptr<FlowFileRecord> flowFile = nullptr;
           while (1) {
-            int numRead = read(_pipefd[0], bufPtr,
-                               (sizeof(buffer) - totalRead));
+            int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
             if (numRead <= 0) {
               if (totalRead > 0) {
                 logger_->log_info("Execute Command Respond %d", totalRead);
                 // child exits and close the pipe
                 ExecuteProcess::WriteCallback callback(buffer, totalRead);
                 if (!flowFile) {
-                  flowFile = std::static_pointer_cast<FlowFileRecord>(
-                      session->create());
+                  flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
                   if (!flowFile)
                     break;
                   flowFile->addAttribute("command", _command.c_str());
-                  flowFile->addAttribute("command.arguments",
-                                         _commandArgument.c_str());
+                  flowFile->addAttribute("command.arguments", _commandArgument.c_str());
                   session->write(flowFile, &callback);
                 } else {
                   session->append(flowFile, &callback);
@@ -206,17 +182,14 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
             } else {
               if (numRead == (sizeof(buffer) - totalRead)) {
                 // we reach the max buffer size
-                logger_->log_info("Execute Command Max Respond %d",
-                                  sizeof(buffer));
+                logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
                 ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
-                  flowFile = std::static_pointer_cast<FlowFileRecord>(
-                      session->create());
+                  flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
                   if (!flowFile)
                     continue;
                   flowFile->addAttribute("command", _command.c_str());
-                  flowFile->addAttribute("command.arguments",
-                                         _commandArgument.c_str());
+                  flowFile->addAttribute("command.arguments", _commandArgument.c_str());
                   session->write(flowFile, &callback);
                 } else {
                   session->append(flowFile, &callback);
@@ -234,11 +207,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
 
         died = wait(&status);
         if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d",
-                            _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
         } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d",
-                            _fullCommand.c_str(), WTERMSIG(status), _pid);
+          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
         }
 
         close(_pipefd[0]);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index 34c0ae2..3741a8f 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -40,20 +40,11 @@ namespace minifi {
 namespace processors {
 const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
 const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-core::Property GenerateFlowFile::FileSize(
-    "File Size", "The size of the file that will be used", "1 kB");
-core::Property GenerateFlowFile::BatchSize(
-    "Batch Size",
-    "The number of FlowFiles to be transferred in each invocation", "1");
-core::Property GenerateFlowFile::DataFormat(
-    "Data Format", "Specifies whether the data should be Text or Binary",
-    GenerateFlowFile::DATA_FORMAT_BINARY);
-core::Property GenerateFlowFile::UniqueFlowFiles(
-    "Unique FlowFiles",
-    "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles",
-    "true");
-core::Relationship GenerateFlowFile::Success(
-    "success", "success operational on the flow record");
+core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
+core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
+core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
+core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
+core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
 
 void GenerateFlowFile::initialize() {
   // Set the supported properties
@@ -69,8 +60,7 @@ void GenerateFlowFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void GenerateFlowFile::onTrigger(core::ProcessContext *context,
-                                 core::ProcessSession *session) {
+void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   int64_t batchSize = 1;
   bool uniqueFlowFile = true;
   int64_t fileSize = 1024;
@@ -83,8 +73,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context,
     core::Property::StringToInt(value, batchSize);
   }
   if (context->getProperty(UniqueFlowFiles.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
-                                                                uniqueFlowFile);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile);
   }
 
   if (!uniqueFlowFile) {
@@ -102,8 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context,
     }
     for (int i = 0; i < batchSize; i++) {
       // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-          FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
       if (!flowFile)
         return;
       if (fileSize > 0)
@@ -126,8 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context,
     GenerateFlowFile::WriteCallback callback(_data, _dataSize);
     for (int i = 0; i < batchSize; i++) {
       // For each batch
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-          FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
       if (!flowFile)
         return;
       if (fileSize > 0)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index e39afec..f1dbb21 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -44,50 +44,22 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-
-
-
-core::Property GetFile::BatchSize(
-    "Batch Size", "The maximum number of files to pull in each iteration",
-    "10");
-core::Property GetFile::Directory(
-    "Input Directory", "The input directory from which to pull files", ".");
-core::Property GetFile::IgnoreHiddenFile(
-    "Ignore Hidden Files",
-    "Indicates whether or not hidden files should be ignored", "true");
-core::Property GetFile::KeepSourceFile(
-    "Keep Source File",
-    "If true, the file is not deleted after it has been copied to the Content Repository",
-    "false");
-core::Property GetFile::MaxAge(
-    "Maximum File Age",
-    "The minimum age that a file must be in order to be pulled;"
-    " any file younger than this amount of time (according to last modification date) will be ignored",
-    "0 sec");
-core::Property GetFile::MinAge(
-    "Minimum File Age",
-    "The maximum age that a file must be in order to be pulled; any file"
-    "older than this amount of time (according to last modification date) will be ignored",
-    "0 sec");
-core::Property GetFile::MaxSize(
-    "Maximum File Size",
-    "The maximum size that a file can be in order to be pulled", "0 B");
-core::Property GetFile::MinSize(
-    "Minimum File Size",
-    "The minimum size that a file must be in order to be pulled", "0 B");
-core::Property GetFile::PollInterval(
-    "Polling Interval",
-    "Indicates how long to wait before performing a directory listing",
-    "0 sec");
-core::Property GetFile::Recurse(
-    "Recurse Subdirectories",
-    "Indicates whether or not to pull files from subdirectories", "true");
-core::Property GetFile::FileFilter(
-    "File Filter",
-    "Only files whose names match the given regular expression will be picked up",
-    "[^\\.].*");
-core::Relationship GetFile::Success("success",
-                                    "All files are routed to success");
+core::Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10");
+core::Property GetFile::Directory("Input Directory", "The input directory from which to pull files", ".");
+core::Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true");
+core::Property GetFile::KeepSourceFile("Keep Source File", "If true, the file is not deleted after it has been copied to the Content Repository", "false");
+core::Property GetFile::MaxAge("Maximum File Age", "The minimum age that a file must be in order to be pulled;"
+                               " any file younger than this amount of time (according to last modification date) will be ignored",
+                               "0 sec");
+core::Property GetFile::MinAge("Minimum File Age", "The maximum age that a file must be in order to be pulled; any file"
+                               "older than this amount of time (according to last modification date) will be ignored",
+                               "0 sec");
+core::Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B");
+core::Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B");
+core::Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec");
+core::Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true");
+core::Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*");
+core::Relationship GetFile::Success("success", "All files are routed to success");
 
 void GetFile::initialize() {
   // Set the supported properties
@@ -110,8 +82,7 @@ void GetFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void GetFile::onSchedule(core::ProcessContext *context,
-                         core::ProcessSessionFactory *sessionFactory) {
+void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
   std::string value;
 
   if (context->getProperty(Directory.getName(), value)) {
@@ -121,27 +92,21 @@ void GetFile::onSchedule(core::ProcessContext *context,
     core::Property::StringToInt(value, request_.batchSize);
   }
   if (context->getProperty(IgnoreHiddenFile.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-        value, request_.ignoreHiddenFile);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.ignoreHiddenFile);
   }
   if (context->getProperty(KeepSourceFile.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-        value, request_.keepSourceFile);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.keepSourceFile);
   }
 
   if (context->getProperty(MaxAge.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, request_.maxAge, unit)
-        && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit,
-                                               request_.maxAge)) {
+    if (core::Property::StringToTime(value, request_.maxAge, unit) && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit, request_.maxAge)) {
       logger_->log_debug("successfully applied _maxAge");
     }
   }
   if (context->getProperty(MinAge.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, request_.minAge, unit)
-        && core::Property::ConvertTimeUnitToMS(request_.minAge, unit,
-                                               request_.minAge)) {
+    if (core::Property::StringToTime(value, request_.minAge, unit) && core::Property::ConvertTimeUnitToMS(request_.minAge, unit, request_.minAge)) {
       logger_->log_debug("successfully applied _minAge");
     }
   }
@@ -153,15 +118,12 @@ void GetFile::onSchedule(core::ProcessContext *context,
   }
   if (context->getProperty(PollInterval.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, request_.pollInterval, unit)
-        && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit,
-                                               request_.pollInterval)) {
+    if (core::Property::StringToTime(value, request_.pollInterval, unit) && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit, request_.pollInterval)) {
       logger_->log_debug("successfully applied _pollInterval");
     }
   }
   if (context->getProperty(Recurse.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-        value, request_.recursive);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.recursive);
   }
 
   if (context->getProperty(FileFilter.getName(), value)) {
@@ -169,13 +131,11 @@ void GetFile::onSchedule(core::ProcessContext *context,
   }
 }
 
-void GetFile::onTrigger(core::ProcessContext *context,
-                        core::ProcessSession *session) {
+void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   // Perform directory list
   logger_->log_info("Is listing empty %i", isListingEmpty());
   if (isListingEmpty()) {
-    if (request_.pollInterval == 0
-        || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
+    if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
       performListing(request_.directory, request_);
       last_listing_time_.store(getTimeMillis());
     }
@@ -190,8 +150,7 @@ void GetFile::onTrigger(core::ProcessContext *context,
         std::string fileName = list.front();
         list.pop();
         logger_->log_info("GetFile process %s", fileName.c_str());
-        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-            FlowFileRecord>(session->create());
+        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
         if (flowFile == nullptr)
           return;
         std::size_t found = fileName.find_last_of("/\\");
@@ -224,12 +183,10 @@ void GetFile::putListing(std::string fileName) {
   _dirList.push(fileName);
 }
 
-void GetFile::pollListing(std::queue<std::string> &list,
-                          const GetFileRequest &request) {
+void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  while (!_dirList.empty()
-      && (request.maxSize == 0 || list.size() < request.maxSize)) {
+  while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) {
     std::string fileName = _dirList.front();
     _dirList.pop();
     list.push(fileName);
@@ -238,8 +195,7 @@ void GetFile::pollListing(std::queue<std::string> &list,
   return;
 }
 
-bool GetFile::acceptFile(std::string fullName, std::string name,
-                         const GetFileRequest &request) {
+bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRequest &request) {
   struct stat statbuf;
 
   if (stat(fullName.c_str(), &statbuf) == 0) {
@@ -296,8 +252,7 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) {
     std::string d_name = entry->d_name;
     if ((entry->d_type & DT_DIR)) {
       // if this is a directory
-      if (request.recursive && strcmp(d_name.c_str(), "..") != 0
-          && strcmp(d_name.c_str(), ".") != 0) {
+      if (request.recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
         std::string path = dir + "/" + d_name;
         performListing(path, request);
       }