You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/04/05 16:11:46 UTC

[2/4] nifi-minifi-cpp git commit: MINIFI-254: Incremental update for linter changes

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Record.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp
deleted file mode 100644
index 6f33300..0000000
--- a/libminifi/src/core/Record.cpp
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2017 <copyright holder> <email>
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-#include "core/FlowFile.h"
-#include "core/logging/Logger.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-FlowFile::FlowFile()
-    : size_(0),
-      id_(0),
-      stored(false),
-      offset_(0),
-      last_queue_date_(0),
-      penaltyExpiration_ms_(0),
-      claim_(nullptr),
-      marked_delete_(false),
-      connection_(nullptr),
-      original_connection_() {
-  entry_date_ = getTimeMillis();
-  lineage_start_date_ = entry_date_;
-
-  char uuidStr[37];
-
-  // Generate the global UUID for the flow record
-  uuid_generate(uuid_);
-
-  uuid_unparse_lower(uuid_, uuidStr);
-  uuid_str_ = uuidStr;
-  
-  logger_ = logging::Logger::getLogger();
-
-}
-
-FlowFile::~FlowFile() {
-
-}
-
-FlowFile& FlowFile::operator=(const FlowFile& other) {
-
-  uuid_copy(uuid_, other.uuid_);
-  stored = other.stored;
-  marked_delete_ = other.marked_delete_;
-  entry_date_ = other.entry_date_;
-  lineage_start_date_ = other.lineage_start_date_;
-  lineage_Identifiers_ = other.lineage_Identifiers_;
-  last_queue_date_ = other.last_queue_date_;
-  size_ = other.size_;
-  penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
-  attributes_ = other.attributes_;
-  claim_ = other.claim_;
-  if (claim_ != nullptr)
-    this->claim_->increaseFlowFileRecordOwnedCount();
-  uuid_str_ = other.uuid_str_;
-  connection_ = other.connection_;
-  original_connection_ = other.original_connection_;
-
-  return *this;
-}
-
-/**
- * Returns whether or not this flow file record
- * is marked as deleted.
- * @return marked deleted
- */
-bool FlowFile::isDeleted() {
-  return marked_delete_;
-}
-
-/**
- * Sets whether to mark this flow file record
- * as deleted
- * @param deleted deleted flag
- */
-void FlowFile::setDeleted(const bool deleted) {
-  marked_delete_ = deleted;
-}
-
-std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
-  return claim_;
-}
-
-void FlowFile::clearResourceClaim() {
-  claim_ = nullptr;
-}
-void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
-  claim_ = claim;
-}
-
-// ! Get Entry Date
-uint64_t FlowFile::getEntryDate() {
-  return entry_date_;
-}
-uint64_t FlowFile::getEventTime() {
-  return event_time_;
-}
-// ! Get Lineage Start Date
-uint64_t FlowFile::getlineageStartDate() {
-  return lineage_start_date_;
-}
-
-std::set<std::string> &FlowFile::getlineageIdentifiers() {
-  return lineage_Identifiers_;
-}
-
-bool FlowFile::getAttribute(std::string key, std::string &value) {
-  auto it = attributes_.find(key);
-  if (it != attributes_.end()) {
-    value = it->second;
-    return true;
-  } else {
-    return false;
-  }
-}
-
-// Get Size
-uint64_t FlowFile::getSize() {
-  return size_;
-}
-// ! Get Offset
-uint64_t FlowFile::getOffset() {
-  return offset_;
-}
-
-bool FlowFile::removeAttribute(const std::string key) {
-  auto it = attributes_.find(key);
-  if (it != attributes_.end()) {
-    attributes_.erase(key);
-    return true;
-  } else {
-    return false;
-  }
-}
-
-bool FlowFile::updateAttribute(const std::string key, const std::string value) {
-  auto it = attributes_.find(key);
-  if (it != attributes_.end()) {
-    attributes_[key] = value;
-    return true;
-  } else {
-    return false;
-  }
-}
-
-bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
-
-
-  auto it = attributes_.find(key);
-  if (it != attributes_.end()) {
-    // attribute already there in the map
-    return false;
-  } else {
-    attributes_[key] = value;
-    return true;
-  }
-}
-
-void FlowFile::setLineageStartDate(const uint64_t date) {
-  lineage_start_date_ = date;
-}
-
-/**
- * Sets the original connection with a shared pointer.
- * @param connection shared connection.
- */
-void FlowFile::setOriginalConnection(
-    std::shared_ptr<core::Connectable> &connection) {
-  original_connection_ = connection;
-}
-
-/**
- * Sets the connection with a shared pointer.
- * @param connection shared connection.
- */
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
-  connection_ = connection;
-}
-
-/**
- * Sets the connection with a shared pointer.
- * @param connection shared connection.
- */
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
-  connection_ = connection;
-}
-
-/**
- * Returns the connection referenced by this record.
- * @return shared connection pointer.
- */
-std::shared_ptr<core::Connectable> FlowFile::getConnection() {
-  return connection_;
-}
-
-/**
- * Returns the original connection referenced by this record.
- * @return shared original connection pointer.
- */
-std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() {
-  return original_connection_;
-}
-
-}
-}
-}
-}
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index 9a27785..50e8cd2 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "core/Repository.h"
+#include <arpa/inet.h>
 #include <cstdint>
 #include <vector>
-#include <arpa/inet.h>
 #include "io/DataStream.h"
 #include "io/Serializable.h"
 #include "core/Relationship.h"
 #include "core/logging/Logger.h"
 #include "FlowController.h"
-#include "core/Repository.h"
 #include "provenance/Provenance.h"
 #include "core/repository/FlowFileRepository.h"
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index 9bdc7c3..c24a2af 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -1,6 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/RepositoryFactory.h"
+#include <memory>
+#include <string>
+#include <algorithm>
 #include "core/Repository.h"
-#include "core/Repository.h"
-
 #ifdef LEVELDB_SUPPORT
 #include "core/repository/FlowFileRepository.h"
 #include "provenance/ProvenanceRepository.h"
@@ -11,57 +29,51 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 #ifndef LEVELDB_SUPPORT
-  namespace provenance{
-  class ProvenanceRepository;
-  }
+namespace provenance {
+class ProvenanceRepository;
+}
 #endif
 namespace core {
 
 #ifndef LEVELDB_SUPPORT
-  class FlowFileRepository;
+class FlowFileRepository;
 #endif
 
-
- std::shared_ptr<core::Repository> createRepository(
-      const std::string configuration_class_name, bool fail_safe = false) {
-
-    std::string class_name_lc = configuration_class_name;
-    std::transform(class_name_lc.begin(), class_name_lc.end(),
-                   class_name_lc.begin(), ::tolower);
-    try {
-      std::shared_ptr<core::Repository> return_obj = nullptr;
-      if (class_name_lc == "flowfilerepository") {
-
-        return_obj = instantiate<core::repository::FlowFileRepository>();
-      } else if (class_name_lc == "provenancerepository") {
-
-
-	return_obj = instantiate<provenance::ProvenanceRepository>();//std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>());
-
-      }
-      
-      if (return_obj){
-        return return_obj;
-      }
-      if (fail_safe) {
-        return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
-                                                  1, 1);
-      } else {
-        throw std::runtime_error(
-            "Support for the provided configuration class could not be found");
-      }
-    } catch (const std::runtime_error &r) {
-      if (fail_safe) {
-        return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
-                                                  1, 1);
-      }
+std::shared_ptr<core::Repository> createRepository(
+    const std::string configuration_class_name, bool fail_safe) {
+  std::shared_ptr<core::Repository> return_obj = nullptr;
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(),
+                 class_name_lc.begin(), ::tolower);
+  try {
+    std::shared_ptr<core::Repository> return_obj = nullptr;
+    if (class_name_lc == "flowfilerepository") {
+      return_obj = instantiate<core::repository::FlowFileRepository>();
+    } else if (class_name_lc == "provenancerepository") {
+      return_obj = instantiate<provenance::ProvenanceRepository>();
     }
 
-    throw std::runtime_error(
-        "Support for the provided configuration class could not be found");
+    if (return_obj) {
+      return return_obj;
+    }
+    if (fail_safe) {
+      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
+                                                1);
+    } else {
+      throw std::runtime_error(
+          "Support for the provided configuration class could not be found");
+    }
+  } catch (const std::runtime_error &r) {
+    if (fail_safe) {
+      return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1,
+                                                1);
+    }
   }
 
-  
+  throw std::runtime_error(
+      "Support for the provided configuration class could not be found");
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/BaseLogger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/BaseLogger.cpp b/libminifi/src/core/logging/BaseLogger.cpp
index a6b43a8..d4774df 100644
--- a/libminifi/src/core/logging/BaseLogger.cpp
+++ b/libminifi/src/core/logging/BaseLogger.cpp
@@ -17,6 +17,10 @@
  */
 
 #include "core/logging/BaseLogger.h"
+#include <utility>
+#include <memory>
+#include <algorithm>
+#include <string>
 
 namespace org {
 namespace apache {
@@ -68,7 +72,6 @@ void BaseLogger::log_info(const char * const format, ...) {
  * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
  */
 void BaseLogger::log_debug(const char * const format, ...) {
-
   if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::debug))
     return;
   FILL_BUFFER
@@ -80,7 +83,6 @@ void BaseLogger::log_debug(const char * const format, ...) {
  * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
  */
 void BaseLogger::log_trace(const char * const format, ...) {
-
   if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::trace))
     return;
   FILL_BUFFER
@@ -122,7 +124,6 @@ void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) {
       logger_->info(buffer);
       break;
   }
-
 }
 
 void BaseLogger::setLogLevel(const std::string &level,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/LogAppenders.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LogAppenders.cpp b/libminifi/src/core/logging/LogAppenders.cpp
index 5d92334..1918a0d 100644
--- a/libminifi/src/core/logging/LogAppenders.cpp
+++ b/libminifi/src/core/logging/LogAppenders.cpp
@@ -24,12 +24,15 @@ namespace minifi {
 namespace core {
 namespace logging {
 
-const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr";
-
-const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file";
-const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files";
-const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size";
+const char *OutputStreamAppender::nifi_log_output_stream_error_stderr =
+    "nifi.log.outputstream.appender.error.stderr";
 
+const char *RollingAppender::nifi_log_rolling_apender_file =
+    "nifi.log.rolling.appender.file";
+const char *RollingAppender::nifi_log_rolling_appender_max_files =
+    "nifi.log.rolling.appender.max.files";
+const char *RollingAppender::nifi_log_rolling_appender_max_file_size =
+    "nifi.log.rolling.appender.max.file_size";
 
 } /* namespace logging */
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp
index d8cadf3..5c08fa8 100644
--- a/libminifi/src/core/logging/Logger.cpp
+++ b/libminifi/src/core/logging/Logger.cpp
@@ -21,6 +21,7 @@
 
 #include <vector>
 #include <queue>
+#include <memory>
 #include <map>
 
 namespace org {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
index 8f13f39..5f62f83 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -1,4 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 #include "core/repository/FlowFileRepository.h"
+#include <memory>
+#include <string>
+#include <vector>
 #include "FlowFileRecord.h"
 
 namespace org {
@@ -20,10 +40,12 @@ void FlowFileRepository::run() {
       leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<
+            FlowFileRecord>(shared_from_this());
         std::string key = it->key().ToString();
-        if (eventRead->DeSerialize((uint8_t *) it->value().data(),
-                                  (int) it->value().size())) {
+        if (eventRead->DeSerialize(
+            reinterpret_cast<const uint8_t *>(it->value().data()),
+            it->value().size())) {
           if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
             purgeList.push_back(key);
         } else {
@@ -47,54 +69,44 @@ void FlowFileRepository::run() {
   return;
 }
 
-void FlowFileRepository::loadComponent()
- {
-
+void FlowFileRepository::loadComponent() {
   std::vector<std::string> purgeList;
-  leveldb::Iterator* it = db_->NewIterator(
-            leveldb::ReadOptions());
+  leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
 
-  for (it->SeekToFirst(); it->Valid(); it->Next())
-  {
-    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    std::shared_ptr<FlowFileRecord> eventRead =
+        std::make_shared<FlowFileRecord>(shared_from_this());
     std::string key = it->key().ToString();
-    if (eventRead->DeSerialize((uint8_t *) it->value().data(),
-        (int) it->value().size()))
-    {
+    if (eventRead->DeSerialize(
+        reinterpret_cast<const uint8_t *>(it->value().data()),
+        it->value().size())) {
       auto search = connectionMap.find(eventRead->getConnectionUuid());
-      if (search != connectionMap.end())
-      {
+      if (search != connectionMap.end()) {
         // we find the connection for the persistent flowfile, create the flowfile and enqueue that
-	std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
-        std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(),flow_file_ref);
+        std::shared_ptr<core::FlowFile> flow_file_ref =
+            std::static_pointer_cast<core::FlowFile>(eventRead);
+        std::shared_ptr<FlowFileRecord> record =
+            std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref);
         // set store to repo to true so that we do need to persistent again in enqueue
         record->setStoredToRepository(true);
         search->second->put(record);
-      }
-      else
-      {
-        if (eventRead->getContentFullPath().length() > 0)
-        {
+      } else {
+        if (eventRead->getContentFullPath().length() > 0) {
           std::remove(eventRead->getContentFullPath().c_str());
         }
         purgeList.push_back(key);
       }
-    }
-    else
-    {
+    } else {
       purgeList.push_back(key);
     }
   }
 
   delete it;
   std::vector<std::string>::iterator itPurge;
-  for (itPurge = purgeList.begin(); itPurge != purgeList.end();
-            itPurge++)
-  {
+  for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
     std::string eventId = *itPurge;
-    logger_->log_info("Repository Repo %s Purge %s",
-                    name_.c_str(),
-                    eventId.c_str());
+    logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
+                      eventId.c_str());
     Delete(eventId);
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 8e59363..4e736f8 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -17,7 +17,10 @@
  */
 
 #include "core/yaml/YamlConfiguration.h"
-
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
 namespace org {
 namespace apache {
 namespace nifi {
@@ -29,18 +32,17 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
   uuid_t uuid;
 
   std::string flowName = rootFlowNode["name"].as<std::string>();
-  std::string id ;
-  
+  std::string id;
+
   try {
     rootFlowNode["id"].as<std::string>();
 
     uuid_parse(id.c_str(), uuid);
-  }catch(...)
-  {
+  } catch (...) {
     logger_->log_warn("Generating random ID for root node");
     uuid_generate(uuid);
     char uuid_str[37];
-    uuid_unparse(uuid,uuid_str);
+    uuid_unparse(uuid, uuid_str);
     id = uuid_str;
   }
 
@@ -69,7 +71,6 @@ void YamlConfiguration::parseProcessorNodeYaml(
   }
 
   if (processorsNode) {
-
     if (processorsNode.IsSequence()) {
       // Evaluate sequence of processors
       int numProcessors = processorsNode.size();
@@ -196,7 +197,6 @@ void YamlConfiguration::parseProcessorNodeYaml(
           processor->setSchedulingStrategy(core::CRON_DRIVEN);
           logger_->log_debug("setting scheduling strategy as %s",
                              procCfg.schedulingStrategy.c_str());
-
         }
 
         int64_t maxConcurrentTasks;
@@ -324,7 +324,6 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
             this->parsePortYaml(&currPort, group, RECEIVE);
           }  // for node
         }
-
       }
     }
   }
@@ -341,11 +340,9 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
   }
 
   if (connectionsNode) {
-
     if (connectionsNode->IsSequence()) {
       for (YAML::const_iterator iter = connectionsNode->begin();
           iter != connectionsNode->end(); ++iter) {
-
         YAML::Node connectionNode = iter->as<YAML::Node>();
 
         std::string name = connectionNode["name"].as<std::string>();
@@ -461,7 +458,6 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
   logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
                      maxConcurrentTasks);
   processor->setMaxConcurrentTasks(maxConcurrentTasks);
-
 }
 
 void YamlConfiguration::parsePropertiesNodeYaml(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index 1400a1d..8070c38 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 #include "io/BaseStream.h"
+#include <string>
 #include "io/Serializable.h"
 
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -32,7 +32,8 @@ namespace io {
  * @return resulting write size
  **/
 int BaseStream::write(uint32_t base_value, bool is_little_endian) {
-	return Serializable::write(base_value, (DataStream*) this, is_little_endian);
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+                             is_little_endian);
 }
 
 /**
@@ -43,7 +44,8 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint16_t base_value, bool is_little_endian) {
-	return Serializable::write(base_value, (DataStream*) this, is_little_endian);
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+                             is_little_endian);
 }
 
 /**
@@ -54,7 +56,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint8_t *value, int len) {
-	return Serializable::write(value, len, (DataStream*) this);
+  return Serializable::write(value, len, reinterpret_cast<DataStream*>(this));
 }
 
 /**
@@ -65,7 +67,8 @@ int BaseStream::write(uint8_t *value, int len) {
  * @return resulting write size
  **/
 int BaseStream::write(uint64_t base_value, bool is_little_endian) {
-	return Serializable::write(base_value, (DataStream*) this, is_little_endian);
+  return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+                             is_little_endian);
 }
 
 /**
@@ -74,8 +77,8 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(bool value) {
-	uint8_t v = value;
-	return Serializable::write(v);
+  uint8_t v = value;
+  return Serializable::write(v);
 }
 
 /**
@@ -84,7 +87,7 @@ int BaseStream::write(bool value) {
  * @return resulting write size
  **/
 int BaseStream::writeUTF(std::string str, bool widen) {
-	return Serializable::writeUTF(str, (DataStream*) this, widen);
+  return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(this), widen);
 }
 
 /**
@@ -94,7 +97,7 @@ int BaseStream::writeUTF(std::string str, bool widen) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t &value) {
-	return Serializable::read(value, (DataStream*) this);
+  return Serializable::read(value, reinterpret_cast<DataStream*>(this));
 }
 
 /**
@@ -104,7 +107,7 @@ int BaseStream::read(uint8_t &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
-	return Serializable::read(base_value, (DataStream*) this);
+  return Serializable::read(base_value, reinterpret_cast<DataStream*>(this));
 }
 
 /**
@@ -114,7 +117,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(char &value) {
-	return Serializable::read(value, (DataStream*) this);
+  return Serializable::read(value, reinterpret_cast<DataStream*>(this));
 }
 
 /**
@@ -125,7 +128,7 @@ int BaseStream::read(char &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t *value, int len) {
-	return Serializable::read(value, len, (DataStream*) this);
+  return Serializable::read(value, len, reinterpret_cast<DataStream*>(this));
 }
 
 /**
@@ -135,7 +138,8 @@ int BaseStream::read(uint8_t *value, int len) {
  * @return resulting read size
  **/
 int BaseStream::read(uint32_t &value, bool is_little_endian) {
-	return Serializable::read(value, (DataStream*) this, is_little_endian);
+  return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+                            is_little_endian);
 }
 
 /**
@@ -145,7 +149,8 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(uint64_t &value, bool is_little_endian) {
-	return Serializable::read(value, (DataStream*) this, is_little_endian);
+  return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+                            is_little_endian);
 }
 
 /**
@@ -155,10 +160,9 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::readUTF(std::string &str, bool widen) {
-	return Serializable::readUTF(str, (DataStream*) this, widen);
+  return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen);
 }
 
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/CRCStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/CRCStream.cpp b/libminifi/src/io/CRCStream.cpp
index 47b45b5..e06a8f5 100644
--- a/libminifi/src/io/CRCStream.cpp
+++ b/libminifi/src/io/CRCStream.cpp
@@ -20,5 +20,3 @@
 #include <memory>
 #include "io/CRCStream.h"
 
-
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index ad6b04d..e62d4f1 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -15,20 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "io/ClientSocket.h"
 #include <netinet/tcp.h>
 #include <sys/types.h>
+#include <netdb.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <unistd.h>
 #include <cstdio>
+#include <utility>
+#include <vector>
 #include <cerrno>
-#include <netdb.h>
 #include <iostream>
 #include <string>
-
 #include "io/validation.h"
-#include "io/ClientSocket.h"
 
 namespace org {
 namespace apache {
@@ -36,7 +37,7 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-std::string Socket::HOSTNAME = Socket::getMyHostName(0);
+char *Socket::HOSTNAME = const_cast<char*>(Socket::getMyHostName(0).c_str());
 
 Socket::Socket(const std::string &hostname, const uint16_t port,
                const uint16_t listeners = -1)
@@ -50,12 +51,10 @@ Socket::Socket(const std::string &hostname, const uint16_t port,
   logger_ = logging::Logger::getLogger();
   FD_ZERO(&total_list_);
   FD_ZERO(&read_fds_);
-
 }
 
 Socket::Socket(const std::string &hostname, const uint16_t port)
     : Socket(hostname, port, 0) {
-
 }
 
 Socket::Socket(const Socket &&other)
@@ -69,7 +68,6 @@ Socket::Socket(const Socket &&other)
       read_fds_(other.read_fds_),
       canonical_hostname_(std::move(other.canonical_hostname_)) {
   logger_ = logging::Logger::getLogger();
-
 }
 
 Socket::~Socket() {
@@ -81,7 +79,6 @@ void Socket::closeStream() {
     freeaddrinfo(addr_info_);
     addr_info_ = 0;
   }
-
   if (socket_file_descriptor_ >= 0) {
     close(socket_file_descriptor_);
     socket_file_descriptor_ = -1;
@@ -98,12 +95,10 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
   setSocketOptions(socket_file_descriptor_);
 
   if (listeners_ > 0) {
-
     struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
     sa_loc->sin_family = AF_INET;
     sa_loc->sin_port = htons(port_);
     sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-
     if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
       logger_->log_error("Could not bind to socket", strerror(errno));
       return -1;
@@ -113,7 +108,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
     if (listeners_ <= 0) {
       struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
       sa_loc->sin_family = AF_INET;
-      //sa_loc->sin_port = htons(port);
       sa_loc->sin_port = htons(port_);
       // use any address if you are connecting to the local machine for testing
       // otherwise we must use the requested hostname
@@ -129,7 +123,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
         logger_->log_warn("Could not connect to socket, error:%s",
                           strerror(errno));
         return -1;
-
       }
     }
   }
@@ -140,7 +133,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
       logger_->log_warn("attempted connection, saw %s", strerror(errno));
       return -1;
     }
-
   }
   // add the listener to the total set
   FD_SET(socket_file_descriptor_, &total_list_);
@@ -148,8 +140,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
   return 0;
 }
 
-short Socket::initialize() {
-
+int16_t Socket::initialize() {
   struct sockaddr_in servAddr;
 
   addrinfo hints = { sizeof(addrinfo) };
@@ -159,7 +150,6 @@ short Socket::initialize() {
   hints.ai_flags = AI_CANONNAME;
   if (listeners_ > 0)
     hints.ai_flags |= AI_PASSIVE;
-
   hints.ai_protocol = 0; /* any protocol */
 
   int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints,
@@ -188,8 +178,7 @@ short Socket::initialize() {
   int hh_errno;
   gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
 #endif
-
-  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+  memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
 
   auto p = addr_info_;
   for (; p != NULL; p = p->ai_next) {
@@ -197,8 +186,7 @@ short Socket::initialize() {
       if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
         canonical_hostname_ = p->ai_canonname;
     }
-
-    //we've successfully connected
+    // we've successfully connected
     if (port_ > 0 && createConnection(p, addr) >= 0) {
       return 0;
       break;
@@ -206,10 +194,9 @@ short Socket::initialize() {
   }
 
   return -1;
-
 }
 
-short Socket::select_descriptor(const uint16_t msec) {
+int16_t Socket::select_descriptor(const uint16_t msec) {
   struct timeval tv;
   int retval;
 
@@ -233,7 +220,6 @@ short Socket::select_descriptor(const uint16_t msec) {
 
   for (int i = 0; i <= socket_max_; i++) {
     if (FD_ISSET(i, &read_fds_)) {
-
       if (i == socket_file_descriptor_) {
         if (listeners_ > 0) {
           struct sockaddr_storage remoteaddr;  // client address
@@ -255,24 +241,23 @@ short Socket::select_descriptor(const uint16_t msec) {
         return i;
       }
     }
-
   }
 
   return -1;
 }
 
-short Socket::setSocketOptions(const int sock) {
+int16_t Socket::setSocketOptions(const int sock) {
   int opt = 1;
   bool nagle_off = true;
 #ifndef __MACH__
   if (nagle_off) {
-    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt))
+    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt))
         < 0) {
       logger_->log_error("setsockopt() TCP_NODELAY failed");
       close(sock);
       return -1;
     }
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt),
             sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() SO_REUSEADDR failed");
       close(sock);
@@ -281,8 +266,8 @@ short Socket::setSocketOptions(const int sock) {
   }
 
   int sndsize = 256 * 1024;
-  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize,
-          (int) sizeof(sndsize)) < 0) {
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>( &sndsize),
+          sizeof(sndsize)) < 0) {
     logger_->log_error("setsockopt() SO_SNDBUF failed");
     close(sock);
     return -1;
@@ -291,8 +276,8 @@ short Socket::setSocketOptions(const int sock) {
 #else
   if (listeners_ > 0) {
     // lose the pesky "address already in use" error message
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt))
-        < 0) {
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                   reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() SO_REUSEADDR failed");
       close(sock);
       return -1;
@@ -307,22 +292,19 @@ std::string Socket::getHostname() const {
 }
 
 int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
-
   if (buf.capacity() < buflen)
     return -1;
-  return writeData((uint8_t*) &buf[0], buflen);
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
 }
 
 // data stream overrides
 
 int Socket::writeData(uint8_t *value, int size) {
-
   int ret = 0, bytes = 0;
 
   while (bytes < size) {
-
     ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
-    //check for errors
+    // check for errors
     if (ret <= 0) {
       close(socket_file_descriptor_);
       logger_->log_error("Could not send to %d, error: %s",
@@ -330,27 +312,23 @@ int Socket::writeData(uint8_t *value, int size) {
       return ret;
     }
     bytes += ret;
-
   }
 
   if (ret)
     logger_->log_trace("Send data size %d over socket %d", size,
                        socket_file_descriptor_);
-
   return bytes;
-
 }
 
 template<typename T>
 inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
   std::vector<uint8_t> buf;
   buf.resize(sizeof t);
-  readData((uint8_t*) &buf[0], sizeof(t));
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
   return buf;
 }
 
 int Socket::write(uint64_t base_value, bool is_little_endian) {
-
   return Serializable::write(base_value, this, is_little_endian);
 }
 
@@ -363,7 +341,6 @@ int Socket::write(uint16_t base_value, bool is_little_endian) {
 }
 
 int Socket::read(uint64_t &value, bool is_little_endian) {
-
   auto buf = readBuffer(value);
 
   if (is_little_endian) {
@@ -381,68 +358,57 @@ int Socket::read(uint64_t &value, bool is_little_endian) {
 }
 
 int Socket::read(uint32_t &value, bool is_little_endian) {
-
   auto buf = readBuffer(value);
 
   if (is_little_endian) {
     value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
   } else {
     value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
-
   }
-
   return sizeof(value);
 }
 
 int Socket::read(uint16_t &value, bool is_little_endian) {
-
   auto buf = readBuffer(value);
 
   if (is_little_endian) {
     value = (buf[0] << 8) | buf[1];
   } else {
     value = buf[0] | buf[1] << 8;
-
   }
   return sizeof(value);
 }
 
 int Socket::readData(std::vector<uint8_t> &buf, int buflen) {
-
   if (buf.capacity() < buflen) {
     buf.resize(buflen);
   }
-  return readData((uint8_t*) &buf[0], buflen);
+  return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
 }
 
 int Socket::readData(uint8_t *buf, int buflen) {
-
-  int total_read = 0;
+  int32_t total_read = 0;
   while (buflen) {
-    short fd = select_descriptor(1000);
+    int16_t fd = select_descriptor(1000);
     if (fd < 0) {
-
       logger_->log_info("fd close %i", buflen);
       close(socket_file_descriptor_);
       return -1;
     }
-
     int bytes_read = recv(fd, buf, buflen, 0);
     if (bytes_read <= 0) {
-      if (bytes_read == 0)
+      if (bytes_read == 0) {
         logger_->log_info("Other side hung up on %d", fd);
-      else {
+      } else {
         logger_->log_error("Could not recv on %d, error: %s", fd,
                            strerror(errno));
       }
       return -1;
     }
-
     buflen -= bytes_read;
     buf += bytes_read;
     total_read += bytes_read;
   }
-
   return total_read;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 7a10bd9..9e0dfce 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -15,17 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
+#include "io/DataStream.h"
+#include <arpa/inet.h>
 #include <vector>
 #include <iostream>
 #include <cstdint>
 #include <cstdio>
 #include <cstring>
 #include <string>
-#include <arpa/inet.h>
 #include <algorithm>
-#include "io/DataStream.h"
 
 namespace org {
 namespace apache {
@@ -33,106 +31,92 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-
 int DataStream::writeData(uint8_t *value, int size) {
-
-    std::copy(value,value+size,std::back_inserter(buffer));
-
-    return size;
+  std::copy(value, value + size, std::back_inserter(buffer));
+  return size;
 }
 
 int DataStream::read(uint64_t &value, bool is_little_endian) {
-    if ((8 + readBuffer) > buffer.size()) {
-        // if read exceed
-        return -1;
-    }
-    uint8_t *buf = &buffer[readBuffer];
-
-    if (is_little_endian) {
-        value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
-                | ((uint64_t) (buf[2] & 255) << 40)
-                | ((uint64_t) (buf[3] & 255) << 32)
-                | ((uint64_t) (buf[4] & 255) << 24)
-                | ((uint64_t) (buf[5] & 255) << 16)
-                | ((uint64_t) (buf[6] & 255) << 8)
-                | ((uint64_t) (buf[7] & 255) << 0);
-    } else {
-        value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
-                | ((uint64_t) (buf[2] & 255) << 16)
-                | ((uint64_t) (buf[3] & 255) << 24)
-                | ((uint64_t) (buf[4] & 255) << 32)
-                | ((uint64_t) (buf[5] & 255) << 40)
-                | ((uint64_t) (buf[6] & 255) << 48)
-                | ((uint64_t) (buf[7] & 255) << 56);
-    }
-    readBuffer += 8;
-    return 8;
+  if ((8 + readBuffer) > buffer.size()) {
+    // if read exceed
+    return -1;
+  }
+  uint8_t *buf = &buffer[readBuffer];
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
+        | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
+        | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
+        | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
+        | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
+        | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
+        | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+  }
+  readBuffer += 8;
+  return 8;
 }
 
 int DataStream::read(uint32_t &value, bool is_little_endian) {
-    if ((4 + readBuffer) > buffer.size()) {
-        // if read exceed
-        return -1;
-    }
-    uint8_t *buf = &buffer[readBuffer];
-
-    if (is_little_endian) {
-        value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
-    } else {
-        value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
-
-    }
-    readBuffer += 4;
-    return 4;
+  if ((4 + readBuffer) > buffer.size()) {
+    // if read exceed
+    return -1;
+  }
+  uint8_t *buf = &buffer[readBuffer];
+
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  }
+  readBuffer += 4;
+  return 4;
 }
 
 int DataStream::read(uint16_t &value, bool is_little_endian) {
-
-    if ((2 + readBuffer) > buffer.size()) {
-        // if read exceed
-        return -1;
-    }
-    uint8_t *buf = &buffer[readBuffer];
-
-    if (is_little_endian) {
-        value = (buf[0] << 8) | buf[1];
-    } else {
-        value = buf[0] | buf[1] << 8;
-
-    }
-    readBuffer += 2;
-    return 2;
+  if ((2 + readBuffer) > buffer.size()) {
+    // if read exceed
+    return -1;
+  }
+  uint8_t *buf = &buffer[readBuffer];
+
+  if (is_little_endian) {
+    value = (buf[0] << 8) | buf[1];
+  } else {
+    value = buf[0] | buf[1] << 8;
+  }
+  readBuffer += 2;
+  return 2;
 }
 
-int DataStream::readData(std::vector<uint8_t> &buf,int buflen) {
-    if ((buflen + readBuffer) > buffer.size()) {
-        // if read exceed
-        return -1;
-    }
+int DataStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  if ((buflen + readBuffer) > buffer.size()) {
+    // if read exceed
+    return -1;
+  }
 
-    if (buf.capacity() < buflen)
-    	buf.resize(buflen);
+  if (buf.capacity() < buflen)
+    buf.resize(buflen);
 
-    buf.insert(buf.begin(),&buffer[readBuffer],&buffer[readBuffer+buflen]);
+  buf.insert(buf.begin(), &buffer[readBuffer], &buffer[readBuffer + buflen]);
 
-    readBuffer += buflen;
-    return buflen;
+  readBuffer += buflen;
+  return buflen;
 }
 
+int DataStream::readData(uint8_t *buf, int buflen) {
+  if ((buflen + readBuffer) > buffer.size()) {
+    // if read exceed
+    return -1;
+  }
 
-int DataStream::readData(uint8_t *buf,int buflen) {
-    if ((buflen + readBuffer) > buffer.size()) {
-        // if read exceed
-        return -1;
-    }
-
-    std::copy(&buffer[readBuffer],&buffer[readBuffer+buflen],buf);
+  std::copy(&buffer[readBuffer], &buffer[readBuffer + buflen], buf);
 
-    readBuffer += buflen;
-    return buflen;
+  readBuffer += buflen;
+  return buflen;
 }
 
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/EndianCheck.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/EndianCheck.cpp b/libminifi/src/io/EndianCheck.cpp
index 1b5020d..fd754c4 100644
--- a/libminifi/src/io/EndianCheck.cpp
+++ b/libminifi/src/io/EndianCheck.cpp
@@ -25,7 +25,6 @@ namespace minifi {
 namespace io {
 bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian();
 
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 7b7f2bd..c3f74c7 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "io/Serializable.h"
+#include <arpa/inet.h>
+#include <cstdio>
 #include <iostream>
 #include <vector>
 #include <string>
-#include <cstdio>
-#include <arpa/inet.h>
+#include <algorithm>
 #include "io/DataStream.h"
-#include "io/Serializable.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -29,8 +30,7 @@ namespace minifi {
 namespace io {
 
 #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
-
-#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1)
+#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1)
 
 template<typename T>
 int Serializable::writeData(const T &t, DataStream *stream) {
@@ -63,7 +63,7 @@ int Serializable::write(uint8_t value, DataStream *stream) {
   return stream->writeData(&value, 1);
 }
 int Serializable::write(char value, DataStream *stream) {
-  return stream->writeData((uint8_t *) &value, 1);
+  return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1);
 }
 
 int Serializable::write(uint8_t *value, int len, DataStream *stream) {
@@ -89,7 +89,7 @@ int Serializable::read(char &value, DataStream *stream) {
 
   int ret = stream->readData(&buf, 1);
   if (ret == 1)
-    value = (char) buf;
+    value = buf;
   return ret;
 }
 
@@ -105,17 +105,14 @@ int Serializable::read(uint16_t &value, DataStream *stream,
 int Serializable::read(uint32_t &value, DataStream *stream,
                        bool is_little_endian) {
   return stream->read(value, is_little_endian);
-
 }
 int Serializable::read(uint64_t &value, DataStream *stream,
                        bool is_little_endian) {
   return stream->read(value, is_little_endian);
-
 }
 
 int Serializable::write(uint32_t base_value, DataStream *stream,
                         bool is_little_endian) {
-
   const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
 
   return writeData(value, stream);
@@ -123,7 +120,6 @@ int Serializable::write(uint32_t base_value, DataStream *stream,
 
 int Serializable::write(uint64_t base_value, DataStream *stream,
                         bool is_little_endian) {
-
   const uint64_t value =
       is_little_endian == 1 ? htonll_r(base_value) : base_value;
   return writeData(value, stream);
@@ -131,7 +127,6 @@ int Serializable::write(uint64_t base_value, DataStream *stream,
 
 int Serializable::write(uint16_t base_value, DataStream *stream,
                         bool is_little_endian) {
-
   const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
 
   return writeData(value, stream);
@@ -150,7 +145,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
   } else {
     uint32_t len;
     ret = read(len, stream);
-
     if (ret <= 0)
       return ret;
     utflen = len;
@@ -166,7 +160,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
 
   // The number of chars produced may be less than utflen
   str = std::string((const char*) &buf[0], utflen);
-
   return utflen;
 }
 
@@ -181,7 +174,6 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) {
     return -1;
 
   if (utflen == 0) {
-
     if (!widen) {
       uint16_t shortLen = utflen;
       write(shortLen, stream);
@@ -207,12 +199,10 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) {
   int ret;
 
   if (!widen) {
-
     uint16_t short_length = utflen;
     write(short_length, stream);
     ret = stream->writeData(utf_to_write.data(), utflen);
   } else {
-    //utflen += 4;
     write(utflen, stream);
     ret = stream->writeData(utf_to_write.data(), utflen);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
index e3aa290..1cf419e 100644
--- a/libminifi/src/io/StreamFactory.cpp
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "../../include/io/StreamFactory.h"
-
+#include "io/StreamFactory.h"
 #include <atomic>
-#include <mutex> 
-  
+#include <mutex>
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -29,7 +28,6 @@ namespace io {
 std::atomic<StreamFactory*> StreamFactory::context_instance_;
 std::mutex StreamFactory::context_mutex_;
 
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index b2df394..1499840 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -15,10 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "properties/Configure.h"
 #include "io/tls/TLSSocket.h"
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <utility>
+#include <string>
+#include <vector>
+#include "properties/Configure.h"
 #include "utils/StringUtils.h"
-
 #include "core/Property.h"
 
 namespace org {
@@ -27,116 +31,111 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
 std::atomic<TLSContext*> TLSContext::context_instance;
 std::mutex TLSContext::context_mutex;
 
-TLSContext::TLSContext() :
-		error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configuration(
-				Configure::getConfigure()) {
-
+TLSContext::TLSContext()
+    : error_value(0),
+      ctx(0),
+      logger_(logging::Logger::getLogger()),
+      configuration(Configure::getConfigure()) {
 }
-
 /**
  * The memory barrier is defined by the singleton
  */
-short TLSContext::initialize() {
-	if (ctx != 0) {
-		return error_value;
-	}
-	std::string clientAuthStr;
-	bool needClientCert = true;
-	if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
-			clientAuthStr)
-			&& org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) {
-		needClientCert = true;
-	}
-
-	SSL_library_init();
-	const SSL_METHOD *method;
-
-	OpenSSL_add_all_algorithms();
-	SSL_load_error_strings();
-	method = TLSv1_2_client_method();
-	ctx = SSL_CTX_new(method);
-	if (ctx == NULL) {
-		logger_->log_error("Could not create SSL context, error: %s.",
-				std::strerror(errno));
-		error_value = TLS_ERROR_CONTEXT;
-		return error_value;
-	}
-	if (needClientCert) {
-		std::string certificate;
-		std::string privatekey;
-		std::string passphrase;
-		std::string caCertificate;
-
-		if (!(configuration->get(Configure::nifi_security_client_certificate,
-				certificate)
-				&& configuration->get(
-						Configure::nifi_security_client_private_key, privatekey))) {
-			logger_->log_error(
-					"Certificate and Private Key PEM file not configured, error: %s.",
-					std::strerror(errno));
-			error_value = TLS_ERROR_PEM_MISSING;
-			return error_value;
-		}
-		// load certificates and private key in PEM format
-		if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(),
-				SSL_FILETYPE_PEM) <= 0) {
-			logger_->log_error("Could not create load certificate, error : %s",
-					std::strerror(errno));
-			error_value = TLS_ERROR_CERT_MISSING;
-			return error_value;
-
-		}
-		if (configuration->get(Configure::nifi_security_client_pass_phrase,
-				passphrase)) {
-			// if the private key has passphase
-			SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
-		}
-		
-
-		int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
-				SSL_FILETYPE_PEM);
-		if (retp != 1) {
-			logger_->log_error("Could not create load private key,%i on %s error : %s",
-					retp,privatekey.c_str(),std::strerror(errno));
-			error_value = TLS_ERROR_KEY_ERROR;
-			return error_value;
-		}
-		// verify private key
-		if (!SSL_CTX_check_private_key(ctx)) {
-			logger_->log_error(
-					"Private key does not match the public certificate, error : %s",
-					std::strerror(errno));
-			error_value = TLS_ERROR_KEY_ERROR;
-			return error_value;
-		}
-		// load CA certificates
-		if (configuration->get(Configure::nifi_security_client_ca_certificate,
-				caCertificate)) {
-			retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
-			if (retp==0) {
-				logger_->log_error(
-						"Can not load CA certificate, Exiting, error : %s",
-						std::strerror(errno));
-				error_value = TLS_ERROR_CERT_ERROR;
-				return error_value;
-			}
-		}
-
-		logger_->log_info("Load/Verify Client Certificate OK.");
-	}
-	return 0;
+int16_t TLSContext::initialize() {
+  if (ctx != 0) {
+    return error_value;
+  }
+  std::string clientAuthStr;
+  bool needClientCert = true;
+  if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
+                           clientAuthStr)
+      && org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+          clientAuthStr, needClientCert))) {
+    needClientCert = true;
+  }
+
+  SSL_library_init();
+  const SSL_METHOD *method;
+
+  OpenSSL_add_all_algorithms();
+  SSL_load_error_strings();
+  method = TLSv1_2_client_method();
+  ctx = SSL_CTX_new(method);
+  if (ctx == NULL) {
+    logger_->log_error("Could not create SSL context, error: %s.",
+                       std::strerror(errno));
+    error_value = TLS_ERROR_CONTEXT;
+    return error_value;
+  }
+  if (needClientCert) {
+    std::string certificate;
+    std::string privatekey;
+    std::string passphrase;
+    std::string caCertificate;
+
+    if (!(configuration->get(Configure::nifi_security_client_certificate,
+                             certificate)
+        && configuration->get(Configure::nifi_security_client_private_key,
+                              privatekey))) {
+      logger_->log_error(
+          "Certificate and Private Key PEM file not configured, error: %s.",
+          std::strerror(errno));
+      error_value = TLS_ERROR_PEM_MISSING;
+      return error_value;
+    }
+    // load certificates and private key in PEM format
+    if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
+        <= 0) {
+      logger_->log_error("Could not create load certificate, error : %s",
+                         std::strerror(errno));
+      error_value = TLS_ERROR_CERT_MISSING;
+      return error_value;
+    }
+    if (configuration->get(Configure::nifi_security_client_pass_phrase,
+                           passphrase)) {
+      // if the private key has passphase
+      SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+    }
+
+    int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
+                                           SSL_FILETYPE_PEM);
+    if (retp != 1) {
+      logger_->log_error(
+          "Could not create load private key,%i on %s error : %s", retp,
+          privatekey.c_str(), std::strerror(errno));
+      error_value = TLS_ERROR_KEY_ERROR;
+      return error_value;
+    }
+    // verify private key
+    if (!SSL_CTX_check_private_key(ctx)) {
+      logger_->log_error(
+          "Private key does not match the public certificate, error : %s",
+          std::strerror(errno));
+      error_value = TLS_ERROR_KEY_ERROR;
+      return error_value;
+    }
+    // load CA certificates
+    if (configuration->get(Configure::nifi_security_client_ca_certificate,
+                           caCertificate)) {
+      retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
+      if (retp == 0) {
+        logger_->log_error("Can not load CA certificate, Exiting, error : %s",
+                           std::strerror(errno));
+        error_value = TLS_ERROR_CERT_ERROR;
+        return error_value;
+      }
+    }
+
+    logger_->log_info("Load/Verify Client Certificate OK.");
+  }
+  return 0;
 }
 
-TLSSocket::~TLSSocket()
-{
-	if (ssl != 0)
-		SSL_free(ssl);
+TLSSocket::~TLSSocket() {
+  if (ssl != 0)
+    SSL_free(ssl);
 }
 /**
  * Constructor that accepts host name, port and listeners. With this
@@ -146,102 +145,100 @@ TLSSocket::~TLSSocket()
  * @param listeners number of listeners in the queue
  */
 TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port,
-		const uint16_t listeners) :
-		Socket(hostname, port, listeners), ssl(0) {
+                     const uint16_t listeners)
+    : Socket(hostname, port, listeners),
+      ssl(0) {
 }
 
-TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) :
-		Socket(hostname, port, 0), ssl(0) {
+TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port)
+    : Socket(hostname, port, 0),
+      ssl(0) {
 }
 
-TLSSocket::TLSSocket(const TLSSocket &&d) :
-		Socket(std::move(d)), ssl(0) {
+TLSSocket::TLSSocket(const TLSSocket &&d)
+    : Socket(std::move(d)),
+      ssl(0) {
 }
 
-short TLSSocket::initialize() {
-	TLSContext *context = TLSContext::getInstance();
-	short ret = context->initialize();
-	Socket::initialize();
-	if (!ret) {
-		// we have s2s secure config
-		ssl = SSL_new(context->getContext());
-		SSL_set_fd(ssl, socket_file_descriptor_);
-		if (SSL_connect(ssl) == -1) {
-			logger_->log_error("SSL socket connect failed to %s %d",
-					requested_hostname_.c_str(), port_);
-			SSL_free(ssl);
-			ssl = NULL;
-			close(socket_file_descriptor_);
-			return -1;
-		} else {
-			logger_->log_info("SSL socket connect success to %s %d",
-					requested_hostname_.c_str(), port_);
-			return 0;
-		}
-	}
-	return ret;
+int16_t TLSSocket::initialize() {
+  TLSContext *context = TLSContext::getInstance();
+  int16_t ret = context->initialize();
+  Socket::initialize();
+  if (!ret) {
+    // we have s2s secure config
+    ssl = SSL_new(context->getContext());
+    SSL_set_fd(ssl, socket_file_descriptor_);
+    if (SSL_connect(ssl) == -1) {
+      logger_->log_error("SSL socket connect failed to %s %d",
+                         requested_hostname_.c_str(), port_);
+      SSL_free(ssl);
+      ssl = NULL;
+      close(socket_file_descriptor_);
+      return -1;
+    } else {
+      logger_->log_info("SSL socket connect success to %s %d",
+                        requested_hostname_.c_str(), port_);
+      return 0;
+    }
+  }
+  return ret;
 }
 
-short TLSSocket::select_descriptor(const uint16_t msec) {
-	if (ssl && SSL_pending(ssl))
-		return 1;
-	return Socket::select_descriptor(msec);
+int16_t TLSSocket::select_descriptor(const uint16_t msec) {
+  if (ssl && SSL_pending(ssl))
+    return 1;
+  return Socket::select_descriptor(msec);
 }
 
-int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen)
-{
- return Socket::writeData(buf,buflen);
+int TLSSocket::writeData(std::vector<uint8_t>& buf, int buflen) {
+  return Socket::writeData(buf, buflen);
 }
 
 int TLSSocket::writeData(uint8_t *value, int size) {
-	if (IsNullOrEmpty(ssl))
-	  return -1;
-	// for SSL, wait for the TLS IO is completed
-	int bytes = 0;
-	int sent = 0;
-	while (bytes < size) {
-
-		sent = SSL_write(ssl, value + bytes, size - bytes);
-		//check for errors
-		if (sent < 0) {
-			logger_->log_error("Site2Site Peer socket %d send failed %s",
-					socket_file_descriptor_, strerror(errno));
-			return sent;
-		}
-		bytes += sent;
-	}
-	return size;
+  if (IsNullOrEmpty(ssl))
+    return -1;
+  // for SSL, wait for the TLS IO is completed
+  int bytes = 0;
+  int sent = 0;
+  while (bytes < size) {
+    sent = SSL_write(ssl, value + bytes, size - bytes);
+    // check for errors
+    if (sent < 0) {
+      logger_->log_error("Site2Site Peer socket %d send failed %s",
+                         socket_file_descriptor_, strerror(errno));
+      return sent;
+    }
+    bytes += sent;
+  }
+  return size;
 }
 
 int TLSSocket::readData(uint8_t *buf, int buflen) {
-
-	if (IsNullOrEmpty(ssl))
-	  return -1;
-	int total_read = 0;
-	int status = 0;
-	while (buflen) {
-		short fd = select_descriptor(1000);
-		if (fd <= 0) {
-
-			close(socket_file_descriptor_);
-			return -1;
-		}
-
-		int sslStatus;
-		do {
-			status = SSL_read(ssl, buf, buflen);
-			sslStatus = SSL_get_error(ssl, status);
-		} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
-
-		buflen -= status;
-		buf += status;
-		total_read += status;
-	}
-
-	return total_read;
+  if (IsNullOrEmpty(ssl))
+    return -1;
+  int total_read = 0;
+  int status = 0;
+  while (buflen) {
+    int16_t fd = select_descriptor(1000);
+    if (fd <= 0) {
+      close(socket_file_descriptor_);
+      return -1;
+    }
+
+    int sslStatus;
+    do {
+      status = SSL_read(ssl, buf, buflen);
+      sslStatus = SSL_get_error(ssl, status);
+    } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+
+    buflen -= status;
+    buf += status;
+    total_read += status;
+  }
+
+  return total_read;
 }
 
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp
index 24ccc9a..b3c76db 100644
--- a/libminifi/src/processors/AppendHostInfo.cpp
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -17,27 +17,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <set>
+#include "processors/AppendHostInfo.h"
+#define __USE_POSIX
+#include <limits.h>
 #include <sys/time.h>
 #include <string.h>
-#include "processors/AppendHostInfo.h"
-#include "core/ProcessContext.h"
-#include "core/Property.h"
-#include "core/ProcessSession.h"
-
 #include <netdb.h>
 #include <netinet/in.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
 #include <net/if.h>
 #include <arpa/inet.h>
-
-#include "../../include/core/FlowFile.h"
+#include <memory>
+#include <string>
+#include <set>
+#include "core/ProcessContext.h"
+#include "core/Property.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
 #include "io/ClientSocket.h"
-
-#define __USE_POSIX
-#include <limits.h>
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -48,7 +46,6 @@ namespace processors {
 #define HOST_NAME_MAX 255
 #endif
 
-const std::string AppendHostInfo::ProcessorName("AppendHostInfo");
 core::Property AppendHostInfo::InterfaceName(
     "Network Interface Name",
     "Network interface from which to read an IP v4 address", "eth0");
@@ -77,31 +74,29 @@ void AppendHostInfo::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void AppendHostInfo::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-  std::shared_ptr<core::FlowFile> flow =
-      session->get();
+void AppendHostInfo::onTrigger(core::ProcessContext *context,
+                               core::ProcessSession *session) {
+  std::shared_ptr<core::FlowFile> flow = session->get();
   if (!flow)
     return;
 
-  //Get Hostname
+  // Get Hostname
 
   std::string hostAttribute = "";
   context->getProperty(HostAttribute.getName(), hostAttribute);
   flow->addAttribute(hostAttribute.c_str(),
                      org::apache::nifi::minifi::io::Socket::getMyHostName());
 
-  //Get IP address for the specified interface
+  // Get IP address for the specified interface
   std::string iface;
   context->getProperty(InterfaceName.getName(), iface);
-  //Confirm the specified interface name exists on this device
+  // Confirm the specified interface name exists on this device
   if (if_nametoindex(iface.c_str()) != 0) {
     struct ifreq ifr;
     int fd = socket(AF_INET, SOCK_DGRAM, 0);
-    //Type of address to retrieve - IPv4 IP address
+    // Type of address to retrieve - IPv4 IP address
     ifr.ifr_addr.sa_family = AF_INET;
-    //Copy the interface name in the ifreq structure
+    // Copy the interface name in the ifreq structure
     strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1);
     ioctl(fd, SIOCGIFADDR, &ifr);
     close(fd);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 3cbbc1b..701c645 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -18,9 +18,12 @@
  * limitations under the License.
  */
 #include "processors/ExecuteProcess.h"
+#include <cstring>
+#include <memory>
+#include <string>
+#include <set>
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include <cstring>
 #include "utils/StringUtils.h"
 #include "utils/TimeUtil.h"
 
@@ -30,14 +33,15 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
 core::Property ExecuteProcess::Command(
     "Command",
-    "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.",
+    "Specifies the command to be executed; if just the name of an executable"
+    " is provided, it must be in the user's environment PATH.",
     "");
 core::Property ExecuteProcess::CommandArguments(
     "Command Arguments",
-    "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.",
+    "The arguments to supply to the executable delimited by white space. White "
+    "space can be escaped by enclosing it in double-quotes.",
     "");
 core::Property ExecuteProcess::WorkingDir(
     "Working Directory",
@@ -45,7 +49,8 @@ core::Property ExecuteProcess::WorkingDir(
     "");
 core::Property ExecuteProcess::BatchDuration(
     "Batch Duration",
-    "If the process is expected to be long-running and produce textual output, a batch duration can be specified.",
+    "If the process is expected to be long-running and produce textual output, a "
+    "batch duration can be specified.",
     "0");
 core::Property ExecuteProcess::RedirectErrorStream(
     "Redirect Error Stream",
@@ -69,9 +74,8 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void ExecuteProcess::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
+void ExecuteProcess::onTrigger(core::ProcessContext *context,
+                               core::ProcessSession *session) {
   std::string value;
   if (context->getProperty(Command.getName(), value)) {
     this->_command = value;
@@ -84,12 +88,10 @@ void ExecuteProcess::onTrigger(
   }
   if (context->getProperty(BatchDuration.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value,
-                                                                _batchDuration,
-                                                                unit)
-        && core::Property::ConvertTimeUnitToMS(
-            _batchDuration, unit, _batchDuration)) {
-
+    if (core::Property::StringToTime(value, _batchDuration, unit)
+        && core::Property::ConvertTimeUnitToMS(_batchDuration, unit,
+                                               _batchDuration)) {
+      logger_->log_info("Setting _batchDuration");
     }
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
@@ -112,9 +114,7 @@ void ExecuteProcess::onTrigger(
   }
   logger_->log_info("Execute Command %s", _fullCommand.c_str());
   // split the command into array
-  char cstr[_fullCommand.length() + 1];
-  std::strcpy(cstr, _fullCommand.c_str());
-  char *p = std::strtok(cstr, " ");
+  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
   int argc = 0;
   char *argv[64];
   while (p != 0 && argc < 64) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index ebdaaa3..34c0ae2 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -17,18 +17,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "processors/GenerateFlowFile.h"
+#include <sys/time.h>
+#include <time.h>
 #include <vector>
 #include <queue>
 #include <map>
+#include <memory>
+#include <string>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
 #include <chrono>
 #include <thread>
 #include <random>
 #include "utils/StringUtils.h"
-
-#include "processors/GenerateFlowFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -39,104 +40,101 @@ namespace minifi {
 namespace processors {
 const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
 const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
-core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
-core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
-core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
-core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
-		"If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
-core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
+core::Property GenerateFlowFile::FileSize(
+    "File Size", "The size of the file that will be used", "1 kB");
+core::Property GenerateFlowFile::BatchSize(
+    "Batch Size",
+    "The number of FlowFiles to be transferred in each invocation", "1");
+core::Property GenerateFlowFile::DataFormat(
+    "Data Format", "Specifies whether the data should be Text or Binary",
+    GenerateFlowFile::DATA_FORMAT_BINARY);
+core::Property GenerateFlowFile::UniqueFlowFiles(
+    "Unique FlowFiles",
+    "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles",
+    "true");
+core::Relationship GenerateFlowFile::Success(
+    "success", "success operational on the flow record");
 
-void GenerateFlowFile::initialize()
-{
-	// Set the supported properties
-	std::set<core::Property> properties;
-	properties.insert(FileSize);
-	properties.insert(BatchSize);
-	properties.insert(DataFormat);
-	properties.insert(UniqueFlowFiles);
-	setSupportedProperties(properties);
-	// Set the supported relationships
-	std::set<core::Relationship> relationships;
-	relationships.insert(Success);
-	setSupportedRelationships(relationships);
+void GenerateFlowFile::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(FileSize);
+  properties.insert(BatchSize);
+  properties.insert(DataFormat);
+  properties.insert(UniqueFlowFiles);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
 }
 
-void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session)
-{
-	int64_t batchSize = 1;
-	bool uniqueFlowFile = true;
-	int64_t fileSize = 1024;
+void GenerateFlowFile::onTrigger(core::ProcessContext *context,
+                                 core::ProcessSession *session) {
+  int64_t batchSize = 1;
+  bool uniqueFlowFile = true;
+  int64_t fileSize = 1024;
 
-	std::string value;
-	if (context->getProperty(FileSize.getName(), value))
-	{
-	  core::Property::StringToInt(value, fileSize);
-	}
-	if (context->getProperty(BatchSize.getName(), value))
-	{
-	  core::Property::StringToInt(value, batchSize);
-	}
-	if (context->getProperty(UniqueFlowFiles.getName(), value))
-	{
-		org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile);
-	}
+  std::string value;
+  if (context->getProperty(FileSize.getName(), value)) {
+    core::Property::StringToInt(value, fileSize);
+  }
+  if (context->getProperty(BatchSize.getName(), value)) {
+    core::Property::StringToInt(value, batchSize);
+  }
+  if (context->getProperty(UniqueFlowFiles.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
+                                                                uniqueFlowFile);
+  }
 
-	if (!uniqueFlowFile)
-	{
-		char *data;
-		data = new char[fileSize];
-		if (!data)
-			return;
-		uint64_t dataSize = fileSize;
-		GenerateFlowFile::WriteCallback callback(data, dataSize);
-		char *current = data;
-		for (int i = 0; i < fileSize; i+= sizeof(int))
-		{
-			int randValue = random();
-			*((int *) current) = randValue;
-			current += sizeof(int);
-		}
-		for (int i = 0; i < batchSize; i++)
-		{
-			// For each batch
-			std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-			if (!flowFile)
-				return;
-			if (fileSize > 0)
-				session->write(flowFile, &callback);
-			session->transfer(flowFile, Success);
-		}
-		delete[] data;
-	}
-	else
-	{
-		if (!_data)
-		{
-			// We have not create the unique data yet
-			_data = new char[fileSize];
-			_dataSize = fileSize;
-			char *current = _data;
-			for (int i = 0; i < fileSize; i+= sizeof(int))
-			{
-				int randValue = random();
-				*((int *) current) = randValue;
-				// *((int *) current) = (0xFFFFFFFF & i);
-				current += sizeof(int);
-			}
-		}
-		GenerateFlowFile::WriteCallback callback(_data, _dataSize);
-		for (int i = 0; i < batchSize; i++)
-		{
-			// For each batch
-			std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-			if (!flowFile)
-				return;
-			if (fileSize > 0)
-				session->write(flowFile, &callback);
-			session->transfer(flowFile, Success);
-		}
-	}
+  if (!uniqueFlowFile) {
+    char *data;
+    data = new char[fileSize];
+    if (!data)
+      return;
+    uint64_t dataSize = fileSize;
+    GenerateFlowFile::WriteCallback callback(data, dataSize);
+    char *current = data;
+    for (int i = 0; i < fileSize; i += sizeof(int)) {
+      int randValue = random();
+      *(reinterpret_cast<int*>(current)) = randValue;
+      current += sizeof(int);
+    }
+    for (int i = 0; i < batchSize; i++) {
+      // For each batch
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+          FlowFileRecord>(session->create());
+      if (!flowFile)
+        return;
+      if (fileSize > 0)
+        session->write(flowFile, &callback);
+      session->transfer(flowFile, Success);
+    }
+    delete[] data;
+  } else {
+    if (!_data) {
+      // We have not create the unique data yet
+      _data = new char[fileSize];
+      _dataSize = fileSize;
+      char *current = _data;
+      for (int i = 0; i < fileSize; i += sizeof(int)) {
+        int randValue = random();
+        *(reinterpret_cast<int*>(current)) = randValue;
+        current += sizeof(int);
+      }
+    }
+    GenerateFlowFile::WriteCallback callback(_data, _dataSize);
+    for (int i = 0; i < batchSize; i++) {
+      // For each batch
+      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+          FlowFileRecord>(session->create());
+      if (!flowFile)
+        return;
+      if (fileSize > 0)
+        session->write(flowFile, &callback);
+      session->transfer(flowFile, Success);
+    }
+  }
 }
 } /* namespace processors */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index 652caf7..2740793 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -15,30 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
+#include "processors/GetFile.h"
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <time.h>
-#include <sstream>
 #include <stdio.h>
-#include <string>
-#include <iostream>
 #include <dirent.h>
 #include <limits.h>
 #include <unistd.h>
-#if  (__GNUC__ >= 4) 
+#if  (__GNUC__ >= 4)
 #if (__GNUC_MINOR__ < 9)
 #include <regex.h>
+#else
+#include <regex>
 #endif
 #endif
+#include <vector>
+#include <queue>
+#include <map>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <string>
+#include <iostream>
 #include "utils/StringUtils.h"
-#include <regex>
 #include "utils/TimeUtil.h"
-#include "processors/GetFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -47,7 +49,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
-const std::string GetFile::ProcessorName("GetFile");
 core::Property GetFile::BatchSize(
     "Batch Size", "The maximum number of files to pull in each iteration",
     "10");
@@ -62,11 +63,13 @@ core::Property GetFile::KeepSourceFile(
     "false");
 core::Property GetFile::MaxAge(
     "Maximum File Age",
-    "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored",
+    "The minimum age that a file must be in order to be pulled;"
+    " any file younger than this amount of time (according to last modification date) will be ignored",
     "0 sec");
 core::Property GetFile::MinAge(
     "Minimum File Age",
-    "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored",
+    "The maximum age that a file must be in order to be pulled; any file"
+    "older than this amount of time (according to last modification date) will be ignored",
     "0 sec");
 core::Property GetFile::MaxSize(
     "Maximum File Size",
@@ -113,7 +116,6 @@ void GetFile::onSchedule(core::ProcessContext *context,
                          core::ProcessSessionFactory *sessionFactory) {
   std::string value;
 
-  logger_->log_info("onTrigger GetFile");
   if (context->getProperty(Directory.getName(), value)) {
     request_.directory = value;
   }
@@ -129,13 +131,12 @@ void GetFile::onSchedule(core::ProcessContext *context,
         value, request_.keepSourceFile);
   }
 
-  logger_->log_info("onTrigger GetFile");
   if (context->getProperty(MaxAge.getName(), value)) {
     core::TimeUnit unit;
     if (core::Property::StringToTime(value, request_.maxAge, unit)
         && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit,
                                                request_.maxAge)) {
-
+      logger_->log_debug("successfully applied _maxAge");
     }
   }
   if (context->getProperty(MinAge.getName(), value)) {
@@ -143,7 +144,7 @@ void GetFile::onSchedule(core::ProcessContext *context,
     if (core::Property::StringToTime(value, request_.minAge, unit)
         && core::Property::ConvertTimeUnitToMS(request_.minAge, unit,
                                                request_.minAge)) {
-
+      logger_->log_debug("successfully applied _minAge");
     }
   }
   if (context->getProperty(MaxSize.getName(), value)) {
@@ -157,7 +158,7 @@ void GetFile::onSchedule(core::ProcessContext *context,
     if (core::Property::StringToTime(value, request_.pollInterval, unit)
         && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit,
                                                request_.pollInterval)) {
-
+      logger_->log_debug("successfully applied _pollInterval");
     }
   }
   if (context->getProperty(Recurse.getName(), value)) {
@@ -172,11 +173,9 @@ void GetFile::onSchedule(core::ProcessContext *context,
 
 void GetFile::onTrigger(core::ProcessContext *context,
                         core::ProcessSession *session) {
-
   // Perform directory list
   logger_->log_info("Is listing empty %i", isListingEmpty());
   if (isListingEmpty()) {
-
     if (request_.pollInterval == 0
         || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
       performListing(request_.directory, request_);
@@ -190,7 +189,6 @@ void GetFile::onTrigger(core::ProcessContext *context,
       std::queue<std::string> list;
       pollListing(list, request_);
       while (!list.empty()) {
-
         std::string fileName = list.front();
         list.pop();
         logger_->log_info("GetFile process %s", fileName.c_str());
@@ -214,7 +212,6 @@ void GetFile::onTrigger(core::ProcessContext *context,
       throw;
     }
   }
-
 }
 
 bool GetFile::isListingEmpty() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index 36c743e..5c4a6bb 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -1,5 +1,6 @@
 /**
  * @file ListenHTTP.cpp
+
  * ListenHTTP class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -17,17 +18,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <sstream>
+#include "processors/ListenHTTP.h"
+#include <uuid/uuid.h>
+#include <CivetServer.h>
 #include <stdio.h>
+#include <sstream>
+#include <utility>
+#include <memory>
 #include <string>
 #include <iostream>
 #include <fstream>
-#include <uuid/uuid.h>
-
-#include <CivetServer.h>
-
-#include "processors/ListenHTTP.h"
-
+#include <set>
+#include <vector>
 #include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -39,15 +41,15 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-const std::string ListenHTTP::ProcessorName("ListenHTTP");
-
-core::Property ListenHTTP::BasePath(
-    "Base Path", "Base path for incoming connections", "contentListener");
+core::Property ListenHTTP::BasePath("Base Path",
+                                    "Base path for incoming connections",
+                                    "contentListener");
 core::Property ListenHTTP::Port(
     "Listening Port", "The Port to listen on for incoming connections", "");
 core::Property ListenHTTP::AuthorizedDNPattern(
     "Authorized DN Pattern",
-    "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.",
+    "A Regular Expression to apply against the Distinguished Name of incoming"
+    " connections. If the Pattern does not match the DN, the connection will be refused.",
     ".*");
 core::Property ListenHTTP::SSLCertificate(
     "SSL Certificate",
@@ -65,17 +67,18 @@ core::Property ListenHTTP::SSLMinimumVersion(
     "SSL2");
 core::Property ListenHTTP::HeadersAsAttributesRegex(
     "HTTP Headers to receive as Attributes (Regex)",
-    "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes",
+    "Specifies the Regular Expression that determines the names of HTTP Headers that"
+    " should be passed along as FlowFile attributes",
     "");
 
-core::Relationship ListenHTTP::Success(
-    "success", "All files are routed to success");
+core::Relationship ListenHTTP::Success("success",
+                                       "All files are routed to success");
 
 void ListenHTTP::initialize() {
   _logger->log_info("Initializing ListenHTTP");
 
   // Set the supported properties
-  std::set < core::Property > properties;
+  std::set<core::Property> properties;
   properties.insert(BasePath);
   properties.insert(Port);
   properties.insert(AuthorizedDNPattern);
@@ -84,17 +87,15 @@ void ListenHTTP::initialize() {
   properties.insert(SSLVerifyPeer);
   properties.insert(SSLMinimumVersion);
   properties.insert(HeadersAsAttributesRegex);
-  setSupportedProperties (properties);
+  setSupportedProperties(properties);
   // Set the supported relationships
-  std::set < core::Relationship > relationships;
+  std::set<core::Relationship> relationships;
   relationships.insert(Success);
-  setSupportedRelationships (relationships);
+  setSupportedRelationships(relationships);
 }
 
-void ListenHTTP::onSchedule(
-    core::ProcessContext *context,
-    core::ProcessSessionFactory *sessionFactory) {
-
+void ListenHTTP::onSchedule(core::ProcessContext *context,
+                            core::ProcessSessionFactory *sessionFactory) {
   std::string basePath;
 
   if (!context->getProperty(BasePath.getName(), basePath)) {
@@ -178,7 +179,7 @@ void ListenHTTP::onSchedule(
       listeningPort.c_str(), basePath.c_str(), numThreads);
 
   // Initialize web server
-  std::vector < std::string > options;
+  std::vector<std::string> options;
   options.push_back("enable_keep_alive");
   options.push_back("yes");
   options.push_back("keep_alive_timeout_ms");
@@ -238,12 +239,10 @@ void ListenHTTP::onSchedule(
 ListenHTTP::~ListenHTTP() {
 }
 
-void ListenHTTP::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-
-  std::shared_ptr < FlowFileRecord > flowFile = std::static_pointer_cast
-      < FlowFileRecord > (session->get());
+void ListenHTTP::onTrigger(core::ProcessContext *context,
+                           core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+      FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -251,10 +250,10 @@ void ListenHTTP::onTrigger(
   }
 }
 
-ListenHTTP::Handler::Handler(
-    core::ProcessContext *context,
-    core::ProcessSessionFactory *sessionFactory,
-    std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
+ListenHTTP::Handler::Handler(core::ProcessContext *context,
+                             core::ProcessSessionFactory *sessionFactory,
+                             std::string &&authDNPattern,
+                             std::string &&headersAsAttributesPattern)
     : _authDNRegex(std::move(authDNPattern)),
       _headersAsAttributesRegex(std::move(headersAsAttributesPattern)) {
   _processContext = context;
@@ -292,8 +291,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server,
 
   auto session = _processSessionFactory->createSession();
   ListenHTTP::WriteCallback callback(conn, req_info);
-  auto flowFile = std::static_pointer_cast < FlowFileRecord
-      > (session->create());
+  auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
 
   if (!flowFile) {
     sendErrorResponse(conn);
@@ -347,9 +345,9 @@ ListenHTTP::WriteCallback::WriteCallback(
 }
 
 void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
-  long long rlen;
-  long long nlen = 0;
-  long long tlen = _reqInfo->content_length;
+  int64_t rlen;
+  int64_t nlen = 0;
+  int64_t tlen = _reqInfo->content_length;
   char buf[16384];
 
   while (nlen < tlen) {