You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:06 UTC

[3/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with increased line length to source

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index 295560f..fd39a64 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -51,73 +51,37 @@ namespace processors {
 
 const char *InvokeHTTP::ProcessorName = "InvokeHTTP";
 
-core::Property InvokeHTTP::Method(
-    "HTTP Method",
-    "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
-    "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.",
-    "GET");
-core::Property InvokeHTTP::URL(
-    "Remote URL",
-    "Remote URL which will be connected to, including scheme, host, port, path.",
-    "");
-core::Property InvokeHTTP::ConnectTimeout(
-    "Connection Timeout", "Max wait time for connection to remote service.",
-    "5 secs");
-core::Property InvokeHTTP::ReadTimeout(
-    "Read Timeout", "Max wait time for response from remote service.",
-    "15 secs");
-core::Property InvokeHTTP::DateHeader(
-    "Include Date Header", "Include an RFC-2616 Date header in the request.",
-    "True");
-core::Property InvokeHTTP::FollowRedirects(
-    "Follow Redirects", "Follow HTTP redirects issued by remote server.",
-    "True");
-core::Property InvokeHTTP::AttributesToSend(
-    "Attributes to Send",
-    "Regular expression that defines which attributes to send as HTTP"
-    " headers in the request. If not defined, no attributes are sent as headers.",
-    "");
-core::Property InvokeHTTP::SSLContext(
-    "SSL Context Service",
-    "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.",
-    "");
-core::Property InvokeHTTP::ProxyHost(
-    "Proxy Host",
-    "The fully qualified hostname or IP address of the proxy server", "");
-core::Property InvokeHTTP::ProxyPort("Proxy Port",
-                                     "The port of the proxy server", "");
-core::Property InvokeHTTP::ProxyUser(
-    "invokehttp-proxy-user",
-    "Username to set when authenticating against proxy", "");
-core::Property InvokeHTTP::ProxyPassword(
-    "invokehttp-proxy-password",
-    "Password to set when authenticating against proxy", "");
-core::Property InvokeHTTP::ContentType(
-    "Content-type",
-    "The Content-Type to specify for when content is being transmitted through a PUT, "
-    "POST or PATCH. In the case of an empty value after evaluating an expression language expression, "
-    "Content-Type defaults to",
-    "application/octet-stream");
-core::Property InvokeHTTP::SendBody(
-    "send-message-body",
-    "If true, sends the HTTP message body on POST/PUT/PATCH requests (default).  "
-    "If false, suppresses the message body and content-type header for these requests.",
-    "true");
-
-core::Property InvokeHTTP::PropPutOutputAttributes(
-    "Put Response Body in Attribute",
-    "If set, the response body received back will be put into an attribute of the original "
-    "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ",
-    "");
-core::Property InvokeHTTP::AlwaysOutputResponse(
-    "Always Output Response",
-    "Will force a response FlowFile to be generated and routed to the 'Response' relationship "
-    "regardless of what the server status code received is ",
-    "false");
-core::Property InvokeHTTP::PenalizeOnNoRetry(
-    "Penalize on \"No Retry\"",
-    "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.",
-    "false");
+core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
+                                  "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.",
+                                  "GET");
+core::Property InvokeHTTP::URL("Remote URL", "Remote URL which will be connected to, including scheme, host, port, path.", "");
+core::Property InvokeHTTP::ConnectTimeout("Connection Timeout", "Max wait time for connection to remote service.", "5 secs");
+core::Property InvokeHTTP::ReadTimeout("Read Timeout", "Max wait time for response from remote service.", "15 secs");
+core::Property InvokeHTTP::DateHeader("Include Date Header", "Include an RFC-2616 Date header in the request.", "True");
+core::Property InvokeHTTP::FollowRedirects("Follow Redirects", "Follow HTTP redirects issued by remote server.", "True");
+core::Property InvokeHTTP::AttributesToSend("Attributes to Send", "Regular expression that defines which attributes to send as HTTP"
+                                            " headers in the request. If not defined, no attributes are sent as headers.",
+                                            "");
+core::Property InvokeHTTP::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", "");
+core::Property InvokeHTTP::ProxyHost("Proxy Host", "The fully qualified hostname or IP address of the proxy server", "");
+core::Property InvokeHTTP::ProxyPort("Proxy Port", "The port of the proxy server", "");
+core::Property InvokeHTTP::ProxyUser("invokehttp-proxy-user", "Username to set when authenticating against proxy", "");
+core::Property InvokeHTTP::ProxyPassword("invokehttp-proxy-password", "Password to set when authenticating against proxy", "");
+core::Property InvokeHTTP::ContentType("Content-type", "The Content-Type to specify for when content is being transmitted through a PUT, "
+                                       "POST or PATCH. In the case of an empty value after evaluating an expression language expression, "
+                                       "Content-Type defaults to",
+                                       "application/octet-stream");
+core::Property InvokeHTTP::SendBody("send-message-body", "If true, sends the HTTP message body on POST/PUT/PATCH requests (default).  "
+                                    "If false, suppresses the message body and content-type header for these requests.",
+                                    "true");
+
+core::Property InvokeHTTP::PropPutOutputAttributes("Put Response Body in Attribute", "If set, the response body received back will be put into an attribute of the original "
+                                                   "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ",
+                                                   "");
+core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will force a response FlowFile to be generated and routed to the 'Response' relationship "
+                                                "regardless of what the server status code received is ",
+                                                "false");
+core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false");
 
 const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code";
 const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message";
@@ -128,31 +92,22 @@ const char* InvokeHTTP::REMOTE_DN = "invokehttp.remote.dn";
 const char* InvokeHTTP::EXCEPTION_CLASS = "invokehttp.java.exception.class";
 const char* InvokeHTTP::EXCEPTION_MESSAGE = "invokehttp.java.exception.message";
 
-core::Relationship InvokeHTTP::Success("success",
-                                       "All files are routed to success");
+core::Relationship InvokeHTTP::Success("success", "All files are routed to success");
 
-core::Relationship InvokeHTTP::RelResponse("response",
-                                           "Represents a response flowfile");
+core::Relationship InvokeHTTP::RelResponse("response", "Represents a response flowfile");
 
-core::Relationship InvokeHTTP::RelRetry(
-    "retry",
-    "The original FlowFile will be routed on any status code that can be retried "
-    "(5xx status codes). It will have new attributes detailing the request.");
+core::Relationship InvokeHTTP::RelRetry("retry", "The original FlowFile will be routed on any status code that can be retried "
+                                        "(5xx status codes). It will have new attributes detailing the request.");
 
-core::Relationship InvokeHTTP::RelNoRetry(
-    "no retry",
-    "The original FlowFile will be routed on any status code that should NOT "
-    "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.");
+core::Relationship InvokeHTTP::RelNoRetry("no retry", "The original FlowFile will be routed on any status code that should NOT "
+                                          "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.");
 
-core::Relationship InvokeHTTP::RelFailure(
-    "failure",
-    "The original FlowFile will be routed on any type of connection failure, "
-    "timeout or general exception. It will have new attributes detailing the request.");
+core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will be routed on any type of connection failure, "
+                                          "timeout or general exception. It will have new attributes detailing the request.");
 
 void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) {
   std::string my_method = method;
-  std::transform(my_method.begin(), my_method.end(), my_method.begin(),
-                 ::toupper);
+  std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
   if (my_method == "POST") {
     curl_easy_setopt(curl, CURLOPT_POST, 1);
   } else if (my_method == "PUT") {
@@ -190,19 +145,14 @@ void InvokeHTTP::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void InvokeHTTP::onSchedule(core::ProcessContext *context,
-                            core::ProcessSessionFactory *sessionFactory) {
+void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
   if (!context->getProperty(Method.getName(), method_)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        Method.getName().c_str(), Method.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str());
     return;
   }
 
   if (!context->getProperty(URL.getName(), url_)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        URL.getName().c_str(), URL.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", URL.getName().c_str(), URL.getValue().c_str());
     return;
   }
 
@@ -213,9 +163,7 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context,
     // set the timeout in curl options.
 
   } else {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str());
 
     return;
   }
@@ -224,67 +172,43 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context,
     core::Property::StringToInt(timeoutStr, read_timeout_);
 
   } else {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str());
   }
 
   std::string dateHeaderStr;
   if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        DateHeader.getName().c_str(), DateHeader.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", DateHeader.getName().c_str(), DateHeader.getValue().c_str());
   }
 
-  date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr,
-                                                          date_header_include_);
+  date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr, date_header_include_);
 
-  if (!context->getProperty(PropPutOutputAttributes.getName(),
-                            put_attribute_name_)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        PropPutOutputAttributes.getName().c_str(),
-        PropPutOutputAttributes.getValue().c_str());
+  if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) {
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName().c_str(), PropPutOutputAttributes.getValue().c_str());
   }
 
-  if (!context->getProperty(AttributesToSend.getName(),
-                            attribute_to_send_regex_)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        AttributesToSend.getName().c_str(),
-        AttributesToSend.getValue().c_str());
+  if (!context->getProperty(AttributesToSend.getName(), attribute_to_send_regex_)) {
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
   }
 
   std::string always_output_response = "false";
-  if (!context->getProperty(AlwaysOutputResponse.getName(),
-                            always_output_response)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        AttributesToSend.getName().c_str(),
-        AttributesToSend.getValue().c_str());
+  if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) {
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
   }
 
-  utils::StringUtils::StringToBool(always_output_response,
-                                   always_output_response_);
+  utils::StringUtils::StringToBool(always_output_response, always_output_response_);
 
   std::string penalize_no_retry = "false";
   if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        AttributesToSend.getName().c_str(),
-        AttributesToSend.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
   }
 
   utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_);
 
   std::string context_name;
-  if (context->getProperty(SSLContext.getName(), context_name)
-      && !IsNullOrEmpty(context_name)) {
-    std::shared_ptr<core::controller::ControllerService> service = context
-        ->getControllerService(context_name);
+  if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) {
+    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
     if (nullptr != service) {
-      ssl_context_service_ = std::static_pointer_cast<
-          minifi::controllers::SSLContextService>(service);
+      ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
     }
   }
 }
@@ -293,8 +217,7 @@ InvokeHTTP::~InvokeHTTP() {
   curl_global_cleanup();
 }
 
-inline bool InvokeHTTP::matches(const std::string &value,
-                                const std::string &sregex) {
+inline bool InvokeHTTP::matches(const std::string &value, const std::string &sregex) {
   if (sregex == ".*")
     return true;
 
@@ -322,9 +245,7 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) {
   return ("POST" == method || "PUT" == method || "PATCH" == method);
 }
 
-struct curl_slist *InvokeHTTP::build_header_list(
-    CURL *curl, std::string regex,
-    const std::map<std::string, std::string> &attributes) {
+struct curl_slist *InvokeHTTP::build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &attributes) {
   struct curl_slist *list = NULL;
   if (curl) {
     for (auto attribute : attributes) {
@@ -345,8 +266,7 @@ bool InvokeHTTP::isSecure(const std::string &url) {
 }
 
 CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) {
-  minifi::controllers::SSLContextService *ssl_context_service =
-      static_cast<minifi::controllers::SSLContextService*>(param);
+  minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
   if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) {
     return CURLE_FAILED_INIT;
   }
@@ -354,26 +274,20 @@ CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) {
 }
 
 void InvokeHTTP::configure_secure_connection(CURL *http_session) {
-  logger_->log_debug("InvokeHTTP -- Using certificate file %s",
-                     ssl_context_service_->getCertificateFile());
+  logger_->log_debug("InvokeHTTP -- Using certificate file %s", ssl_context_service_->getCertificateFile());
   curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
-  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
-                   &InvokeHTTP::configure_ssl_context);
-  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
-                   static_cast<void*>(ssl_context_service_.get()));
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &InvokeHTTP::configure_ssl_context);
+  curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
 }
 
-void InvokeHTTP::onTrigger(core::ProcessContext *context,
-                           core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-      FlowFileRecord>(session->get());
+void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
 
   logger_->log_info("onTrigger InvokeHTTP with  %s", method_.c_str());
 
   if (flowFile == nullptr) {
     if (!emitFlowFile(method_)) {
-      logger_->log_info("InvokeHTTP -- create flow file with  %s",
-                        method_.c_str());
+      logger_->log_info("InvokeHTTP -- create flow file with  %s", method_.c_str());
       flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
     } else {
       logger_->log_info("exiting because method is %s", method_.c_str());
@@ -402,11 +316,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
     curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
   }
   HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
-                   &HTTPRequestResponse::recieve_write);
+  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::recieve_write);
 
-  curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
-                   static_cast<void*>(&content));
+  curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
 
   if (emitFlowFile(method_)) {
     logger_->log_info("InvokeHTTP -- reading flowfile");
@@ -419,12 +331,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
       callbackObj->pos = 0;
       logger_->log_info("InvokeHTTP -- Setting callback");
       curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L);
-      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE,
-                       (curl_off_t)callback->getBufferSize());
-      curl_easy_setopt(http_session, CURLOPT_READFUNCTION,
-                       &HTTPRequestResponse::send_write);
-      curl_easy_setopt(http_session, CURLOPT_READDATA,
-                       static_cast<void*>(callbackObj));
+      curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize());
+      curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &HTTPRequestResponse::send_write);
+      curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj));
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }
@@ -434,9 +343,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
   }
 
   // append all headers
-  struct curl_slist *headers = build_header_list(http_session,
-                                                 attribute_to_send_regex_,
-                                                 flowFile->getAttributes());
+  struct curl_slist *headers = build_header_list(http_session, attribute_to_send_regex_, flowFile->getAttributes());
   curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, headers);
 
   logger_->log_info("InvokeHTTP -- curl performed");
@@ -459,10 +366,8 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
     flowFile->addAttribute(REQUEST_URL, url_);
     flowFile->addAttribute(TRANSACTION_ID, tx_id);
 
-    bool isSuccess = ((int32_t) (http_code / 100)) == 2
-        && res != CURLE_ABORTED_BY_CALLBACK;
-    bool output_body_to_requestAttr = (!isSuccess || putToAttribute)
-        && flowFile != nullptr;
+    bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK;
+    bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr;
     bool output_body_to_content = isSuccess && !putToAttribute;
     bool body_empty = IsNullOrEmpty(content.data);
 
@@ -471,11 +376,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
 
     if (output_body_to_content) {
       if (flowFile != nullptr) {
-        response_flow = std::static_pointer_cast<FlowFileRecord>(
-            session->create(flowFile));
+        response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile));
       } else {
-        response_flow = std::static_pointer_cast<FlowFileRecord>(
-            session->create());
+        response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
       }
 
       std::string ct = content_type;
@@ -484,28 +387,22 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
       response_flow->addAttribute(STATUS_MESSAGE, response_body);
       response_flow->addAttribute(REQUEST_URL, url_);
       response_flow->addAttribute(TRANSACTION_ID, tx_id);
-      io::DataStream stream((const uint8_t*) content.data.data(),
-                            content.data.size());
+      io::DataStream stream((const uint8_t*) content.data.data(), content.data.size());
       // need an import from the data stream.
       session->importFrom(stream, response_flow);
     } else {
       logger_->log_info("Cannot output body to content");
-      response_flow = std::static_pointer_cast<FlowFileRecord>(
-          session->create());
+      response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
     }
     route(flowFile, response_flow, session, context, isSuccess, http_code);
   } else {
-    logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n",
-                       curl_easy_strerror(res));
+    logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n", curl_easy_strerror(res));
   }
   curl_slist_free_all(headers);
   curl_easy_cleanup(http_session);
 }
 
-void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request,
-                       std::shared_ptr<FlowFileRecord> &response,
-                       core::ProcessSession *session,
-                       core::ProcessContext *context, bool isSuccess,
+void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess,
                        int statusCode) {
   // check if we should yield the processor
   if (!isSuccess && request == nullptr) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index 9fce69a..c26b41d 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -42,38 +42,20 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ListenHTTP::BasePath("Base Path",
-                                    "Base path for incoming connections",
-                                    "contentListener");
-core::Property ListenHTTP::Port(
-    "Listening Port", "The Port to listen on for incoming connections", "");
-core::Property ListenHTTP::AuthorizedDNPattern(
-    "Authorized DN Pattern",
-    "A Regular Expression to apply against the Distinguished Name of incoming"
-    " connections. If the Pattern does not match the DN, the connection will be refused.",
-    ".*");
-core::Property ListenHTTP::SSLCertificate(
-    "SSL Certificate",
-    "File containing PEM-formatted file including TLS/SSL certificate and key",
-    "");
-core::Property ListenHTTP::SSLCertificateAuthority(
-    "SSL Certificate Authority",
-    "File containing trusted PEM-formatted certificates", "");
-core::Property ListenHTTP::SSLVerifyPeer(
-    "SSL Verify Peer",
-    "Whether or not to verify the client's certificate (yes/no)", "no");
-core::Property ListenHTTP::SSLMinimumVersion(
-    "SSL Minimum Version",
-    "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)",
-    "SSL2");
-core::Property ListenHTTP::HeadersAsAttributesRegex(
-    "HTTP Headers to receive as Attributes (Regex)",
-    "Specifies the Regular Expression that determines the names of HTTP Headers that"
-    " should be passed along as FlowFile attributes",
-    "");
-
-core::Relationship ListenHTTP::Success("success",
-                                       "All files are routed to success");
+core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
+core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
+core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
+                                               " connections. If the Pattern does not match the DN, the connection will be refused.",
+                                               ".*");
+core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
+core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
+core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
+core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
+core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
+                                                    " should be passed along as FlowFile attributes",
+                                                    "");
+
+core::Relationship ListenHTTP::Success("success", "All files are routed to success");
 
 void ListenHTTP::initialize() {
   logger_->log_info("Initializing ListenHTTP");
@@ -95,14 +77,11 @@ void ListenHTTP::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void ListenHTTP::onSchedule(core::ProcessContext *context,
-                            core::ProcessSessionFactory *sessionFactory) {
+void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
   std::string basePath;
 
   if (!context->getProperty(BasePath.getName(), basePath)) {
-    logger_->log_info(
-        "%s attribute is missing, so default value of %s will be used",
-        BasePath.getName().c_str(), BasePath.getValue().c_str());
+    logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
     basePath = BasePath.getValue();
   }
 
@@ -111,26 +90,20 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
   std::string listeningPort;
 
   if (!context->getProperty(Port.getName(), listeningPort)) {
-    logger_->log_error("%s attribute is missing or invalid",
-                       Port.getName().c_str());
+    logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
     return;
   }
 
   std::string authDNPattern;
 
-  if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern)
-      && !authDNPattern.empty()) {
-    logger_->log_info("ListenHTTP using %s: %s",
-                      AuthorizedDNPattern.getName().c_str(),
-                      authDNPattern.c_str());
+  if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
+    logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
   }
 
   std::string sslCertFile;
 
-  if (context->getProperty(SSLCertificate.getName(), sslCertFile)
-      && !sslCertFile.empty()) {
-    logger_->log_info("ListenHTTP using %s: %s",
-                      SSLCertificate.getName().c_str(), sslCertFile.c_str());
+  if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
+    logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
   }
 
   // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
@@ -139,12 +112,8 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
   std::string sslMinVer;
 
   if (!sslCertFile.empty()) {
-    if (context->getProperty(SSLCertificateAuthority.getName(),
-                             sslCertAuthorityFile)
-        && !sslCertAuthorityFile.empty()) {
-      logger_->log_info("ListenHTTP using %s: %s",
-                        SSLCertificateAuthority.getName().c_str(),
-                        sslCertAuthorityFile.c_str());
+    if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
+      logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
     }
 
     if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
@@ -158,26 +127,19 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
     }
 
     if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
-      logger_->log_info("ListenHTTP using %s: %s",
-                        SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
+      logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
     }
   }
 
   std::string headersAsAttributesPattern;
 
-  if (context->getProperty(HeadersAsAttributesRegex.getName(),
-                           headersAsAttributesPattern)
-      && !headersAsAttributesPattern.empty()) {
-    logger_->log_info("ListenHTTP using %s: %s",
-                      HeadersAsAttributesRegex.getName().c_str(),
-                      headersAsAttributesPattern.c_str());
+  if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
+    logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
   }
 
   auto numThreads = getMaxConcurrentTasks();
 
-  logger_->log_info(
-      "ListenHTTP starting HTTP server on port %s and path %s with %d threads",
-      listeningPort.c_str(), basePath.c_str(), numThreads);
+  logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
 
   // Initialize web server
   std::vector<std::string> options;
@@ -231,19 +193,15 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
   }
 
   _server.reset(new CivetServer(options));
-  _handler.reset(
-      new Handler(context, sessionFactory, std::move(authDNPattern),
-                  std::move(headersAsAttributesPattern)));
+  _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
   _server->addHandler(basePath, _handler.get());
 }
 
 ListenHTTP::~ListenHTTP() {
 }
 
-void ListenHTTP::onTrigger(core::ProcessContext *context,
-                           core::ProcessSession *session) {
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-      FlowFileRecord>(session->get());
+void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -251,10 +209,7 @@ void ListenHTTP::onTrigger(core::ProcessContext *context,
   }
 }
 
-ListenHTTP::Handler::Handler(core::ProcessContext *context,
-                             core::ProcessSessionFactory *sessionFactory,
-                             std::string &&authDNPattern,
-                             std::string &&headersAsAttributesPattern)
+ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
     : _authDNRegex(std::move(authDNPattern)),
       _headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
       logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
@@ -268,11 +223,9 @@ void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
             "Content-Length: 0\r\n\r\n");
 }
 
-bool ListenHTTP::Handler::handlePost(CivetServer *server,
-                                     struct mg_connection *conn) {
+bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
   auto req_info = mg_get_request_info(conn);
-  logger_->log_info("ListenHTTP handling POST request of length %d",
-                    req_info->content_length);
+  logger_->log_info("ListenHTTP handling POST request of length %d", req_info->content_length);
 
   // If this is a two-way TLS connection, authorize the peer against the configured pattern
   if (req_info->is_ssl && req_info->client_cert != nullptr) {
@@ -280,8 +233,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server,
       mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
                 "Content-Type: text/html\r\n"
                 "Content-Length: 0\r\n\r\n");
-      logger_->log_warn("ListenHTTP client DN not authorized: %s",
-                        req_info->client_cert->subject);
+      logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
       return true;
     }
   }
@@ -337,8 +289,8 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server,
   return true;
 }
 
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) :
-    logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
+ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
+    : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
   _conn = conn;
   _reqInfo = reqInfo;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp
index e7d2e7b..054d585 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -35,37 +35,18 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ListenSyslog::RecvBufSize(
-    "Receive Buffer Size",
-    "The size of each buffer used to receive Syslog messages.", "65507 B");
-core::Property ListenSyslog::MaxSocketBufSize(
-    "Max Size of Socket Buffer",
-    "The maximum size of the socket buffer that should be used.", "1 MB");
-core::Property ListenSyslog::MaxConnections(
-    "Max Number of TCP Connections",
-    "The maximum number of concurrent connections to accept Syslog messages in TCP mode.",
-    "2");
-core::Property ListenSyslog::MaxBatchSize(
-    "Max Batch Size",
-    "The maximum number of Syslog events to add to a single FlowFile.", "1");
-core::Property ListenSyslog::MessageDelimiter(
-    "Message Delimiter",
-    "Specifies the delimiter to place between Syslog messages when multiple "
-    "messages are bundled together (see <Max Batch Size> core::Property).",
-    "\n");
-core::Property ListenSyslog::ParseMessages(
-    "Parse Messages",
-    "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.",
-    "false");
-core::Property ListenSyslog::Protocol("Protocol",
-                                      "The protocol for Syslog communication.",
-                                      "UDP");
-core::Property ListenSyslog::Port("Port", "The port for Syslog communication.",
-                                  "514");
-core::Relationship ListenSyslog::Success("success",
-                                         "All files are routed to success");
-core::Relationship ListenSyslog::Invalid("invalid",
-                                         "SysLog message format invalid");
+core::Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
+core::Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
+core::Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
+core::Property ListenSyslog::MaxBatchSize("Max Batch Size", "The maximum number of Syslog events to add to a single FlowFile.", "1");
+core::Property ListenSyslog::MessageDelimiter("Message Delimiter", "Specifies the delimiter to place between Syslog messages when multiple "
+                                              "messages are bundled together (see <Max Batch Size> core::Property).",
+                                              "\n");
+core::Property ListenSyslog::ParseMessages("Parse Messages", "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
+core::Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
+core::Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
+core::Relationship ListenSyslog::Success("success", "All files are routed to success");
+core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
 
 void ListenSyslog::initialize() {
   // Set the supported properties
@@ -140,8 +121,7 @@ void ListenSyslog::runThread() {
       if (_protocol == "TCP")
         listen(sockfd, 5);
       _serverSocket = sockfd;
-      logger_->log_error("ListenSysLog Server socket %d bind OK to port %d",
-                         _serverSocket, portno);
+      logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
     }
     FD_ZERO(&_readfds);
     FD_SET(_serverSocket, &_readfds);
@@ -171,14 +151,11 @@ void ListenSyslog::runThread() {
         socklen_t clilen;
         struct sockaddr_in cli_addr;
         clilen = sizeof(cli_addr);
-        int newsockfd = accept(_serverSocket,
-                               reinterpret_cast<struct sockaddr *>(&cli_addr),
-                               &clilen);
+        int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen);
         if (newsockfd > 0) {
           if (_clientSockets.size() < _maxConnections) {
             _clientSockets.push_back(newsockfd);
-            logger_->log_info("ListenSysLog new client socket %d connection",
-                              newsockfd);
+            logger_->log_info("ListenSysLog new client socket %d connection", newsockfd);
             continue;
           } else {
             close(newsockfd);
@@ -188,10 +165,8 @@ void ListenSyslog::runThread() {
         socklen_t clilen;
         struct sockaddr_in cli_addr;
         clilen = sizeof(cli_addr);
-        int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
-                               (struct sockaddr *) &cli_addr, &clilen);
-        if (recvlen > 0
-            && (recvlen + getEventQueueByteSize()) <= _recvBufSize) {
+        int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, (struct sockaddr *) &cli_addr, &clilen);
+        if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) {
           uint8_t *payload = new uint8_t[recvlen];
           memcpy(payload, _buffer, recvlen);
           putEvent(payload, recvlen);
@@ -205,8 +180,7 @@ void ListenSyslog::runThread() {
         int recvlen = readline(clientSocket, _buffer, sizeof(_buffer));
         if (recvlen <= 0) {
           close(clientSocket);
-          logger_->log_info("ListenSysLog client socket %d close",
-                            clientSocket);
+          logger_->log_info("ListenSysLog client socket %d close", clientSocket);
           it = _clientSockets.erase(it);
         } else {
           if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) {
@@ -253,8 +227,7 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t len) {
   return -1;
 }
 
-void ListenSyslog::onTrigger(core::ProcessContext *context,
-                             core::ProcessSession *session) {
+void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   std::string value;
   bool needResetServerSocket = false;
   if (context->getProperty(Protocol.getName(), value)) {
@@ -275,8 +248,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context,
     _messageDelimiter = value;
   }
   if (context->getProperty(ParseMessages.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
-                                                                _parseMessages);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _parseMessages);
   }
   if (context->getProperty(Port.getName(), value)) {
     int64_t oldPort = _port;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp
index d2dcd10..e308901 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -39,27 +39,14 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
-core::Property LogAttribute::LogLevel(
-    "Log Level", "The Log Level to use when logging the Attributes", "info");
-core::Property LogAttribute::AttributesToLog(
-    "Attributes to Log",
-    "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.",
-    "");
-core::Property LogAttribute::AttributesToIgnore(
-    "Attributes to Ignore",
-    "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.",
-    "");
-core::Property LogAttribute::LogPayload(
-    "Log Payload",
-    "If true, the FlowFile's payload will be logged, in addition to its attributes;"
-    "otherwise, just the Attributes will be logged.",
-    "false");
-core::Property LogAttribute::LogPrefix(
-    "Log prefix",
-    "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.",
-    "");
-core::Relationship LogAttribute::Success(
-    "success", "success operational on the flow record");
+core::Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info");
+core::Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", "");
+core::Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", "");
+core::Property LogAttribute::LogPayload("Log Payload", "If true, the FlowFile's payload will be logged, in addition to its attributes;"
+                                        "otherwise, just the Attributes will be logged.",
+                                        "false");
+core::Property LogAttribute::LogPrefix("Log prefix", "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", "");
+core::Relationship LogAttribute::Success("success", "success operational on the flow record");
 
 void LogAttribute::initialize() {
   // Set the supported properties
@@ -76,8 +63,7 @@ void LogAttribute::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void LogAttribute::onTrigger(core::ProcessContext *context,
-                             core::ProcessSession *session) {
+void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   std::string dashLine = "--------------------------------------------------";
   LogAttrLevel level = LogAttrLevelInfo;
   bool logPayload = false;
@@ -96,8 +82,7 @@ void LogAttribute::onTrigger(core::ProcessContext *context,
     dashLine = "-----" + value + "-----";
   }
   if (context->getProperty(LogPayload.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
-                                                                logPayload);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, logPayload);
   }
 
   message << "Logging for flow file " << "\n";
@@ -105,10 +90,8 @@ void LogAttribute::onTrigger(core::ProcessContext *context,
   message << "\nStandard FlowFile Attributes";
   message << "\n" << "UUID:" << flow->getUUIDStr();
   message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
-  message << "\n" << "lineageStartDate:"
-          << getTimeStr(flow->getlineageStartDate());
-  message << "\n" << "Size:" << flow->getSize() << " Offset:"
-          << flow->getOffset();
+  message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate());
+  message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
   message << "\nFlowFile Attributes Map Content";
   std::map<std::string, std::string> attrs = flow->getAttributes();
   std::map<std::string, std::string>::iterator it;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index d8832c7..0aba3d7 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -38,19 +38,12 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property PutFile::Directory("Output Directory",
-                                  "The output directory to which to put files",
-                                  ".");
-core::Property PutFile::ConflictResolution(
-    "Conflict Resolution Strategy",
-    "Indicates what should happen when a file with the same name already exists in the output directory",
-    CONFLICT_RESOLUTION_STRATEGY_FAIL);
-
-core::Relationship PutFile::Success("success",
-                                    "All files are routed to success");
-core::Relationship PutFile::Failure(
-    "failure",
-    "Failed files (conflict, write failure, etc.) are transferred to failure");
+core::Property PutFile::Directory("Output Directory", "The output directory to which to put files", ".");
+core::Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory",
+                                           CONFLICT_RESOLUTION_STRATEGY_FAIL);
+
+core::Relationship PutFile::Success("success", "All files are routed to success");
+core::Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");
 
 void PutFile::initialize() {
   // Set the supported properties
@@ -65,28 +58,23 @@ void PutFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void PutFile::onSchedule(core::ProcessContext *context,
-                         core::ProcessSessionFactory *sessionFactory) {
+void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
   if (!context->getProperty(Directory.getName(), directory_)) {
     logger_->log_error("Directory attribute is missing or invalid");
   }
 
-  if (!context->getProperty(ConflictResolution.getName(),
-                            conflict_resolution_)) {
-    logger_->log_error(
-        "Conflict Resolution Strategy attribute is missing or invalid");
+  if (!context->getProperty(ConflictResolution.getName(), conflict_resolution_)) {
+    logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
   }
 }
 
-void PutFile::onTrigger(core::ProcessContext *context,
-                        core::ProcessSession *session) {
+void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) {
     context->yield();
     return;
   }
 
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-      FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -111,16 +99,13 @@ void PutFile::onTrigger(core::ProcessContext *context,
   destFileSs << directory_ << "/" << filename;
   std::string destFile = destFileSs.str();
 
-  logger_->log_info("PutFile writing file %s into directory %s",
-                    filename.c_str(), directory_.c_str());
+  logger_->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory_.c_str());
 
   // If file exists, apply conflict resolution strategy
   struct stat statResult;
 
   if (stat(destFile.c_str(), &statResult) == 0) {
-    logger_->log_info(
-        "Destination file %s exists; applying Conflict Resolution Strategy: %s",
-        destFile.c_str(), conflict_resolution_.c_str());
+    logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflict_resolution_.c_str());
 
     if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
       putFile(session, flowFile, tmpFile, destFile);
@@ -134,9 +119,7 @@ void PutFile::onTrigger(core::ProcessContext *context,
   }
 }
 
-bool PutFile::putFile(core::ProcessSession *session,
-                      std::shared_ptr<FlowFileRecord> flowFile,
-                      const std::string &tmpFile, const std::string &destFile) {
+bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile) {
   ReadCallback cb(tmpFile, destFile);
   session->read(flowFile, &cb);
 
@@ -149,8 +132,7 @@ bool PutFile::putFile(core::ProcessSession *session,
   return false;
 }
 
-PutFile::ReadCallback::ReadCallback(const std::string &tmpFile,
-                                    const std::string &destFile)
+PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
     : _tmpFile(tmpFile),
       _tmpFileOs(tmpFile),
       _destFile(destFile),
@@ -170,25 +152,19 @@ void PutFile::ReadCallback::process(std::ifstream *stream) {
 bool PutFile::ReadCallback::commit() {
   bool success = false;
 
-  logger_->log_info("PutFile committing put file operation to %s",
-                    _destFile.c_str());
+  logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str());
 
   if (_writeSucceeded) {
     _tmpFileOs.close();
 
     if (rename(_tmpFile.c_str(), _destFile.c_str())) {
-      logger_->log_info(
-          "PutFile commit put file operation to %s failed because rename() call failed",
-          _destFile.c_str());
+      logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str());
     } else {
       success = true;
-      logger_->log_info("PutFile commit put file operation to %s succeeded",
-                        _destFile.c_str());
+      logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
     }
   } else {
-    logger_->log_error(
-        "PutFile commit put file operation to %s failed because write failed",
-        _destFile.c_str());
+    logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
   }
 
   return success;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index abb02ca..d4f1b80 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -46,16 +46,11 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName(
-    "File to Tail",
-    "Fully-qualified filename of the file that should be tailed", "");
-core::Property TailFile::StateFile(
-    "State File",
-    "Specifies the file that should be used for storing state about"
-    " what data has been ingested so that upon restart NiFi can resume from where it left off",
-    "TailFileState");
-core::Relationship TailFile::Success("success",
-                                     "All files are routed to success");
+core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
+core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
+                                   " what data has been ingested so that upon restart NiFi can resume from where it left off",
+                                   "TailFileState");
+core::Relationship TailFile::Success("success", "All files are routed to success");
 
 void TailFile::initialize() {
   // Set the supported properties
@@ -84,8 +79,7 @@ void TailFile::parseStateFileLine(char *buf) {
     ++line;
 
   char first = line[0];
-  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n')
-      || (first == '=')) {
+  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) {
     return;
   }
 
@@ -125,8 +119,7 @@ void TailFile::recoverState() {
     return;
   }
   char buf[BUFFER_SIZE];
-  for (file.getline(buf, BUFFER_SIZE); file.good();
-      file.getline(buf, BUFFER_SIZE)) {
+  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
     parseStateFileLine(buf);
   }
 }
@@ -142,12 +135,10 @@ void TailFile::storeState() {
   file.close();
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i,
-                                    TailMatchedFileItem j) {
+static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
   return (i.modifiedTime < j.modifiedTime);
 }
-void TailFile::checkRollOver(const std::string &fileLocation,
-                             const std::string &fileName) {
+void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) {
   struct stat statbuf;
   std::vector<TailMatchedFileItem> matchedFiles;
   std::string fullPath = fileLocation + "/" + _currentTailFileName;
@@ -157,8 +148,7 @@ void TailFile::checkRollOver(const std::string &fileLocation,
       // there are new input for the current tail file
       return;
 
-    uint64_t modifiedTimeCurrentTailFile =
-        ((uint64_t) (statbuf.st_mtime) * 1000);
+    uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000);
     std::string pattern = fileName;
     std::size_t found = fileName.find_last_of(".");
     if (found != std::string::npos)
@@ -176,10 +166,8 @@ void TailFile::checkRollOver(const std::string &fileLocation,
       if (!(entry->d_type & DT_DIR)) {
         std::string fileName = d_name;
         std::string fileFullName = fileLocation + "/" + d_name;
-        if (fileFullName.find(pattern) != std::string::npos
-            && stat(fileFullName.c_str(), &statbuf) == 0) {
-          if (((uint64_t) (statbuf.st_mtime) * 1000)
-              >= modifiedTimeCurrentTailFile) {
+        if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) {
+          if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) {
             TailMatchedFileItem item;
             item.fileName = fileName;
             item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
@@ -191,18 +179,14 @@ void TailFile::checkRollOver(const std::string &fileLocation,
     closedir(d);
 
     // Sort the list based on modified time
-    std::sort(matchedFiles.begin(), matchedFiles.end(),
-              sortTailMatchedFileItem);
-    for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin();
-        it != matchedFiles.end(); ++it) {
+    std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
+    for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it != matchedFiles.end(); ++it) {
       TailMatchedFileItem item = *it;
       if (item.fileName == _currentTailFileName) {
         ++it;
         if (it != matchedFiles.end()) {
           TailMatchedFileItem nextItem = *it;
-          logger_->log_info("TailFile File Roll Over from %s to %s",
-                            _currentTailFileName.c_str(),
-                            nextItem.fileName.c_str());
+          logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
           _currentTailFileName = nextItem.fileName;
           _currentTailFilePosition = 0;
           storeState();
@@ -215,8 +199,7 @@ void TailFile::checkRollOver(const std::string &fileLocation,
   }
 }
 
-void TailFile::onTrigger(core::ProcessContext *context,
-                         core::ProcessSession *session) {
+void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
   std::string value;
   std::string fileLocation = "";
@@ -245,8 +228,7 @@ void TailFile::onTrigger(core::ProcessContext *context,
       context->yield();
       return;
     }
-    std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-        FlowFileRecord>(session->create());
+    std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
     if (!flowFile)
       return;
     std::size_t found = _currentTailFileName.find_last_of(".");
@@ -256,12 +238,8 @@ void TailFile::onTrigger(core::ProcessContext *context,
     flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
     session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
     session->transfer(flowFile, Success);
-    logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(),
-                      flowFile->getSize());
-    std::string logName = baseName + "."
-        + std::to_string(_currentTailFilePosition) + "-"
-        + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "."
-        + extension;
+    logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize());
+    std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
     flowFile->updateKeyedAttribute(FILENAME, logName);
     this->_currentTailFilePosition += flowFile->getSize();
     storeState();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 083d0b2..ff6a149 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -34,49 +34,37 @@ namespace nifi {
 namespace minifi {
 namespace provenance {
 
-const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY+1] =
-{ "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK",
-                "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE",
-                "ADDINFO", "REPLAY"};
+const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED",
+    "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
 
 // DeSerialize
-bool ProvenanceEventRecord::DeSerialize(
-    const std::shared_ptr<core::Repository> &repo, std::string key) {
+bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key) {
   std::string value;
   bool ret;
 
   ret = repo->Get(key, value);
 
   if (!ret) {
-    logger_->log_error("NiFi Provenance Store event %s can not found",
-                       key.c_str());
+    logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str());
     return false;
   } else {
-    logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(),
-                       value.length());
+    logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length());
   }
 
-  org::apache::nifi::minifi::io::DataStream stream(
-      (const uint8_t*) value.data(), value.length());
+  org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length());
 
   ret = DeSerialize(stream);
 
   if (ret) {
-    logger_->log_debug(
-        "NiFi Provenance retrieve event %s size %d eventType %d success",
-        _eventIdStr.c_str(), stream.getSize(), _eventType);
+    logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), stream.getSize(), _eventType);
   } else {
-    logger_->log_debug(
-        "NiFi Provenance retrieve event %s size %d eventType %d fail",
-        _eventIdStr.c_str(), stream.getSize(), _eventType);
+    logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), stream.getSize(), _eventType);
   }
 
   return ret;
 }
 
-bool ProvenanceEventRecord::Serialize(
-    const std::shared_ptr<core::Repository> &repo) {
-
+bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &repo) {
   org::apache::nifi::minifi::io::DataStream outStream;
 
   int ret;
@@ -170,9 +158,7 @@ bool ProvenanceEventRecord::Serialize(
     return false;
   }
 
-  if (this->_eventType == ProvenanceEventRecord::FORK
-      || this->_eventType == ProvenanceEventRecord::CLONE
-      || this->_eventType == ProvenanceEventRecord::JOIN) {
+  if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
     // write UUIDs
     uint32_t number = this->_parentUuids.size();
     ret = write(number, &outStream);
@@ -196,8 +182,7 @@ bool ProvenanceEventRecord::Serialize(
         return false;
       }
     }
-  } else if (this->_eventType == ProvenanceEventRecord::SEND
-      || this->_eventType == ProvenanceEventRecord::FETCH) {
+  } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
     ret = writeUTF(this->_transitUri, &outStream);
     if (ret <= 0) {
       return false;
@@ -213,19 +198,15 @@ bool ProvenanceEventRecord::Serialize(
     }
   }
   // Persistent to the DB
-  if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()),
-                outStream.getSize())) {
-    logger_->log_debug("NiFi Provenance Store event %s size %d success",
-                       _eventIdStr.c_str(), outStream.getSize());
+  if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+    logger_->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), outStream.getSize());
   } else {
-    logger_->log_error("NiFi Provenance Store event %s size %d fail",
-                       _eventIdStr.c_str(), outStream.getSize());
+    logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize());
   }
   return true;
 }
 
-bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
-                                        const int bufferSize) {
+bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
   int ret;
 
   org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize);
@@ -325,9 +306,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
     return false;
   }
 
-  if (this->_eventType == ProvenanceEventRecord::FORK
-      || this->_eventType == ProvenanceEventRecord::CLONE
-      || this->_eventType == ProvenanceEventRecord::JOIN) {
+  if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
     // read UUIDs
     uint32_t number = 0;
     ret = read(number, &outStream);
@@ -356,8 +335,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
       }
       this->addChildUuid(childUUID);
     }
-  } else if (this->_eventType == ProvenanceEventRecord::SEND
-      || this->_eventType == ProvenanceEventRecord::FETCH) {
+  } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
     ret = readUTF(this->_transitUri, &outStream);
     if (ret <= 0) {
       return false;
@@ -386,8 +364,7 @@ void ProvenanceReporter::commit() {
   }
 }
 
-void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow,
-                                std::string detail) {
+void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow);
 
   if (event) {
@@ -396,9 +373,7 @@ void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow,
-                               core::Relationship relation, std::string detail,
-                               uint64_t processingDuration) {
+void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow);
 
   if (event) {
@@ -409,10 +384,8 @@ void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
-                                          std::string detail) {
-  ProvenanceEventRecord *event = allocate(
-      ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
+void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail) {
+  ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
 
   if (event) {
     event->setDetails(detail);
@@ -420,11 +393,8 @@ void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow,
-                                       std::string detail,
-                                       uint64_t processingDuration) {
-  ProvenanceEventRecord *event = allocate(
-      ProvenanceEventRecord::CONTENT_MODIFIED, flow);
+void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration) {
+  ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
 
   if (event) {
     event->setDetails(detail);
@@ -433,8 +403,7 @@ void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent,
-                               std::shared_ptr<core::FlowFile> child) {
+void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent);
 
   if (event) {
@@ -444,10 +413,7 @@ void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent,
   }
 }
 
-void ProvenanceReporter::join(
-    std::vector<std::shared_ptr<core::FlowFile> > parents,
-    std::shared_ptr<core::FlowFile> child, std::string detail,
-    uint64_t processingDuration) {
+void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child);
 
   if (event) {
@@ -463,10 +429,7 @@ void ProvenanceReporter::join(
   }
 }
 
-void ProvenanceReporter::fork(
-    std::vector<std::shared_ptr<core::FlowFile> > child,
-    std::shared_ptr<core::FlowFile> parent, std::string detail,
-    uint64_t processingDuration) {
+void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent);
 
   if (event) {
@@ -482,8 +445,7 @@ void ProvenanceReporter::fork(
   }
 }
 
-void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow,
-                                std::string detail) {
+void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::string detail) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow);
 
   if (event) {
@@ -492,8 +454,7 @@ void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow,
-                              std::string reason) {
+void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string reason) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow);
 
   if (event) {
@@ -503,9 +464,7 @@ void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow,
-                              std::string transitUri, std::string detail,
-                              uint64_t processingDuration, bool force) {
+void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow);
 
   if (event) {
@@ -522,11 +481,7 @@ void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
-                                 std::string transitUri,
-                                 std::string sourceSystemFlowFileIdentifier,
-                                 std::string detail,
-                                 uint64_t processingDuration) {
+void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow);
 
   if (event) {
@@ -538,9 +493,7 @@ void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow,
-                               std::string transitUri, std::string detail,
-                               uint64_t processingDuration) {
+void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration) {
   ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow);
 
   if (event) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index 77de5ba..e4a8ffa 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -40,14 +40,11 @@ void ProvenanceRepository::run() {
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
         ProvenanceEventRecord eventRead;
         std::string key = it->key().ToString();
-        if (eventRead.DeSerialize(
-            reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())),
-            it->value().size())) {
+        if (eventRead.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size())) {
           if ((curTime - eventRead.getEventTime()) > max_partition_millis_)
             purgeList.push_back(key);
         } else {
-          logger_->log_debug("NiFi Provenance retrieve event %s fail",
-                             key.c_str());
+          logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
           purgeList.push_back(key);
         }
       }
@@ -56,8 +53,7 @@ void ProvenanceRepository::run() {
 
       for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
         std::string eventId = *itPurge;
-        logger_->log_info("ProvenanceRepository Repo Purge %s",
-                          eventId.c_str());
+        logger_->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str());
         Delete(eventId);
       }
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/test/Server.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp
index bb3e682..39efb12 100644
--- a/libminifi/test/Server.cpp
+++ b/libminifi/test/Server.cpp
@@ -50,8 +50,7 @@ typedef enum {
 } FlowControlMsgType;
 
 // FlowControl Protocol Msg Type String
-static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = {
-    "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
 
 // Flow Control Msg Type to String
 inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) {
@@ -83,10 +82,8 @@ typedef enum {
 } FlowControlMsgID;
 
 // FlowControl Protocol Msg ID String
-static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = {
-    "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT",
-    "REPORT_INTERVAL", "PROCESSOR_NAME"
-        "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME"
+    "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
 
 #define TYPE_HDR_LEN 4 // Fix Hdr Type
 #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
@@ -122,9 +119,7 @@ typedef enum {
 } FlowControlRespCode;
 
 // FlowControl Resp Code str
-static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS",
-    "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER",
-    "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
 
 // Flow Control Resp Code to String
 inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) {
@@ -332,8 +327,7 @@ int main(int argc, char *argv[]) {
     exit(1);
   }
 
-  if (signal(SIGINT, sigHandler) == SIG_ERR
-      || signal(SIGTERM, sigHandler) == SIG_ERR) {
+  if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) {
 
     return -1;
   }
@@ -360,13 +354,10 @@ int main(int argc, char *argv[]) {
       FlowControlProtocolHeader hdr;
       int status = readHdr(newsockfd, &hdr);
       if (status > 0) {
-        printf("Flow Control Protocol receive MsgType %s\n",
-               FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+        printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
         printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
-        printf("Flow Control Protocol receive Resp Code %s\n",
-               FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
-        printf("Flow Control Protocol receive Payload len %d\n",
-               hdr.payloadLen);
+        printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+        printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen);
         if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) {
           printf("Flow Control Protocol Register Req receive\n");
           uint8_t *payload = new uint8_t[hdr.payloadLen];
@@ -384,12 +375,10 @@ int main(int argc, char *argv[]) {
             } else if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) {
               uint32_t len;
               payloadPtr = decode(payloadPtr, len);
-              printf("Flow Control Protocol receive YAML name length %d\n",
-                     len);
+              printf("Flow Control Protocol receive YAML name length %d\n", len);
               std::string flowName = (const char *) payloadPtr;
               payloadPtr += len;
-              printf("Flow Control Protocol receive YAML name %s\n",
-                     flowName.c_str());
+              printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str());
             } else {
               break;
             }
@@ -399,11 +388,9 @@ int main(int argc, char *argv[]) {
           // Calculate the total payload msg size
           char *ymlContent;
           uint32_t yamlLen = readYAML(&ymlContent);
-          uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL,
-                                                             0);
+          uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
           if (yamlLen > 0)
-            payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT,
-                                                       yamlLen);
+            payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT, yamlLen);
 
           uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
           uint8_t *data = new uint8_t[size];
@@ -444,12 +431,10 @@ int main(int argc, char *argv[]) {
             if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) {
               uint32_t len;
               payloadPtr = decode(payloadPtr, len);
-              printf("Flow Control Protocol receive YAML name length %d\n",
-                     len);
+              printf("Flow Control Protocol receive YAML name length %d\n", len);
               std::string flowName = (const char *) payloadPtr;
               payloadPtr += len;
-              printf("Flow Control Protocol receive YAML name %s\n",
-                     flowName.c_str());
+              printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str());
             } else {
               break;
             }
@@ -475,16 +460,11 @@ int main(int argc, char *argv[]) {
             propertyValue2 = "41";
             flag = 0;
           }
-          uint32_t payloadSize = FlowControlMsgIDEncodingLen(
-              PROCESSOR_NAME, processor.size() + 1);
-          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME,
-                                                     propertyName1.size() + 1);
-          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE,
-                                                     propertyValue1.size() + 1);
-          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME,
-                                                     propertyName2.size() + 1);
-          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE,
-                                                     propertyValue2.size() + 1);
+          uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size() + 1);
+          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size() + 1);
+          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size() + 1);
+          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size() + 1);
+          payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size() + 1);
 
           uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
           uint8_t *data = new uint8_t[size];

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 7b1ac6b..e675043 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -36,40 +36,40 @@
 class LogTestController {
  public:
   static LogTestController& getInstance() {
-   static LogTestController instance;
-   return instance;
+    static LogTestController instance;
+    return instance;
   }
-  
+
   template<typename T>
   void setTrace() {
     setLevel<T>(spdlog::level::trace);
   }
-  
+
   template<typename T>
   void setDebug() {
     setLevel<T>(spdlog::level::debug);
   }
-  
+
   template<typename T>
   void setInfo() {
     setLevel<T>(spdlog::level::info);
   }
-  
+
   template<typename T>
   void setWarn() {
     setLevel<T>(spdlog::level::warn);
   }
-  
+
   template<typename T>
   void setError() {
     setLevel<T>(spdlog::level::err);
   }
-  
+
   template<typename T>
   void setOff() {
     setLevel<T>(spdlog::level::off);
   }
-  
+
   template<typename T>
   void setLevel(spdlog::level::level_enum level) {
     logging::LoggerFactory<T>::getLogger();
@@ -77,17 +77,17 @@ class LogTestController {
     modified_loggers.push_back(name);
     setLevel(name, level);
   }
-  
+
   bool contains(const std::string &ending) {
-   return contains(log_output, ending);
+    return contains(log_output, ending);
   }
-  
+
   bool contains(const std::ostringstream &stream, const std::string &ending) {
     std::string str = stream.str();
     logger_->log_info("Looking for %s in %s.", ending, str);
     return (ending.length() > 0 && str.find(ending) != std::string::npos);
   }
-  
+
   void reset() {
     for (auto const & name : modified_loggers) {
       setLevel(name, spdlog::level::err);
@@ -95,35 +95,40 @@ class LogTestController {
     modified_loggers = std::vector<std::string>();
     resetStream(log_output);
   }
-  
+
   inline void resetStream(std::ostringstream &stream) {
     stream.str("");
     stream.clear();
   }
-  
+
   std::ostringstream log_output;
-  
+
   std::shared_ptr<logging::Logger> logger_;
  private:
-   class TestBootstrapLogger: public logging::Logger {
-    public:
-      TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger):Logger(logger){};
-   };
+  class TestBootstrapLogger : public logging::Logger {
+   public:
+    TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
+        : Logger(logger) {
+    }
+    ;
+  };
   LogTestController() {
-   std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>();
-   logger_properties->set("logger.root", "ERROR,ostream");
-   logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO");
-   logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
-   std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>();
-   dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true));
-   dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance());
-   logger_properties->add_sink("ostream", dist_sink);
-   logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
-   logger_ = logging::LoggerFactory<LogTestController>::getLogger();
+    std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>();
+    logger_properties->set("logger.root", "ERROR,ostream");
+    logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO");
+    logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
+    std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>();
+    dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true));
+    dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance());
+    logger_properties->add_sink("ostream", dist_sink);
+    logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
+    logger_ = logging::LoggerFactory<LogTestController>::getLogger();
   }
   LogTestController(LogTestController const&);
   LogTestController& operator=(LogTestController const&);
-  ~LogTestController() {};
+  ~LogTestController() {
+  }
+  ;
 
   void setLevel(const std::string name, spdlog::level::level_enum level) {
     logger_->log_info("Setting log level for %s to %s", name, spdlog::level::to_str(level));