You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:09 UTC

[04/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
new file mode 100644
index 0000000..9a27785
--- /dev/null
+++ b/libminifi/src/core/Repository.cpp
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "io/DataStream.h"
+#include "io/Serializable.h"
+#include "core/Relationship.h"
+#include "core/logging/Logger.h"
+#include "FlowController.h"
+#include "core/Repository.h"
+#include "provenance/Provenance.h"
+#include "core/repository/FlowFileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+void Repository::start() {
+  if (this->purge_period_ <= 0)
+    return;
+  if (running_)
+    return;
+  thread_ = std::thread(&Repository::threadExecutor, this);
+  thread_.detach();
+  running_ = true;
+  logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+}
+
+void Repository::stop() {
+  if (!running_)
+    return;
+  running_ = false;
+  if (thread_.joinable())
+    thread_.join();
+  logger_->log_info("%s Repository Monitor Thread Stop", name_.c_str());
+}
+
+// repoSize
+uint64_t Repository::repoSize() {
+  return repo_size_;
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
new file mode 100644
index 0000000..ef0b9ef
--- /dev/null
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -0,0 +1,69 @@
+#include "core/Repository.h"
+#include "core/Repository.h"
+
+#ifdef LEVELDB_SUPPORT
+#include "core/repository/FlowFileRepository.h"
+#include "provenance/ProvenanceRepository.h"
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+#ifndef LEVELDB_SUPPORT
+  namespace provenance{
+  class ProvenanceRepository;
+  }
+#endif
+namespace core {
+
+#ifndef LEVELDB_SUPPORT
+  class FlowFileRepository;
+#endif
+
+
+ std::shared_ptr<core::Repository> createRepository(
+      const std::string configuration_class_name, bool fail_safe = false) {
+
+    std::string class_name_lc = configuration_class_name;
+    std::transform(class_name_lc.begin(), class_name_lc.end(),
+                   class_name_lc.begin(), ::tolower);
+    try {
+      std::shared_ptr<core::Repository> return_obj = nullptr;
+      if (class_name_lc == "flowfilerepository") {
+
+        return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<core::repository::FlowFileRepository>());
+      } else if (class_name_lc == "provenancerepository") {
+
+
+	return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>());
+
+      }
+      
+      if (return_obj){
+        return return_obj;
+      }
+      if (fail_safe) {
+        return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
+                                                  1, 1);
+      } else {
+        throw std::runtime_error(
+            "Support for the provided configuration class could not be found");
+      }
+    } catch (const std::runtime_error &r) {
+      if (fail_safe) {
+        return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1,
+                                                  1, 1);
+      }
+    }
+
+    throw std::runtime_error(
+        "Support for the provided configuration class could not be found");
+  }
+
+  
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/BaseLogger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/BaseLogger.cpp b/libminifi/src/core/logging/BaseLogger.cpp
new file mode 100644
index 0000000..a6b43a8
--- /dev/null
+++ b/libminifi/src/core/logging/BaseLogger.cpp
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/BaseLogger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+// Logger related configuration items.
+const char *BaseLogger::nifi_log_level = "nifi.log.level";
+const char *BaseLogger::nifi_log_appender = "nifi.log.appender";
+
+/**
+ * @brief Log error message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_error(const char * const format, ...) {
+  if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::err))
+    return;
+  FILL_BUFFER
+  log_str(err, buffer);
+}
+/**
+ * @brief Log warn message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_warn(const char * const format, ...) {
+  if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::warn))
+    return;
+  FILL_BUFFER
+  log_str(warn, buffer);
+}
+/**
+ * @brief Log info message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_info(const char * const format, ...) {
+  if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::info))
+    return;
+  FILL_BUFFER
+  log_str(info, buffer);
+}
+/**
+ * @brief Log debug message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_debug(const char * const format, ...) {
+
+  if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::debug))
+    return;
+  FILL_BUFFER
+  log_str(debug, buffer);
+}
+/**
+ * @brief Log trace message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_trace(const char * const format, ...) {
+
+  if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::trace))
+    return;
+  FILL_BUFFER
+  log_str(debug, buffer);
+}
+
+// overridables
+
+/**
+ * @brief Log error message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) {
+  switch (level) {
+    case err:
+    case critical:
+      if (stderr_ != nullptr) {
+        stderr_->error(buffer);
+      } else {
+        logger_->error(buffer);
+      }
+      break;
+    case warn:
+      logger_->warn(buffer);
+      break;
+    case info:
+      logger_->info(buffer);
+      break;
+    case debug:
+      logger_->debug(buffer);
+      break;
+    case trace:
+      logger_->trace(buffer);
+      break;
+    case off:
+      break;
+    default:
+      logger_->info(buffer);
+      break;
+  }
+
+}
+
+void BaseLogger::setLogLevel(const std::string &level,
+                             LOG_LEVEL_E defaultLevel) {
+  std::string logLevel = level;
+  std::transform(logLevel.begin(), logLevel.end(), logLevel.begin(), ::tolower);
+
+  if (logLevel == "trace") {
+    setLogLevel(trace);
+  } else if (logLevel == "debug") {
+    setLogLevel(debug);
+  } else if (logLevel == "info") {
+    setLogLevel(info);
+  } else if (logLevel == "warn") {
+    setLogLevel(warn);
+  } else if (logLevel == "error") {
+    setLogLevel(err);
+  } else if (logLevel == "critical") {
+    setLogLevel(critical);
+  } else if (logLevel == "off") {
+    setLogLevel(off);
+  } else {
+    setLogLevel(defaultLevel);
+  }
+}
+
+void BaseLogger::set_error_logger(std::shared_ptr<spdlog::logger> other) {
+  stderr_ = std::move(other);
+}
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/LogAppenders.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LogAppenders.cpp b/libminifi/src/core/logging/LogAppenders.cpp
new file mode 100644
index 0000000..5d92334
--- /dev/null
+++ b/libminifi/src/core/logging/LogAppenders.cpp
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/LogAppenders.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr";
+
+const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file";
+const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files";
+const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size";
+
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp
new file mode 100644
index 0000000..d8cadf3
--- /dev/null
+++ b/libminifi/src/core/logging/Logger.cpp
@@ -0,0 +1,40 @@
+/**
+ * @file Logger.cpp
+ * Logger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/logging/Logger.h"
+
+#include <vector>
+#include <queue>
+#include <map>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+std::shared_ptr<Logger> Logger::singleton_logger_(nullptr);
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp
new file mode 100644
index 0000000..c495a67
--- /dev/null
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -0,0 +1,109 @@
+#include "core/repository/FlowFileRepository.h"
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+void FlowFileRepository::run() {
+  // threshold for purge
+  uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
+  while (running_) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+    uint64_t curTime = getTimeMillis();
+    uint64_t size = repoSize();
+    if (size >= purgeThreshold) {
+      std::vector<std::string> purgeList;
+      leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+
+      for (it->SeekToFirst(); it->Valid(); it->Next()) {
+        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+        std::string key = it->key().ToString();
+        if (eventRead->DeSerialize((uint8_t *) it->value().data(),
+                                  (int) it->value().size())) {
+          if ((curTime - eventRead->getEventTime()) > max_partition_millis_)
+            purgeList.push_back(key);
+        } else {
+          logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(),
+                             key.c_str());
+          purgeList.push_back(key);
+        }
+      }
+      delete it;
+      for (auto eventId : purgeList) {
+        logger_->log_info("Repository Repo %s Purge %s", name_.c_str(),
+                          eventId.c_str());
+        Delete(eventId);
+      }
+    }
+    if (size > max_partition_bytes_)
+      repo_full_ = true;
+    else
+      repo_full_ = false;
+  }
+  return;
+}
+
+void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap)
+ {
+
+  std::vector<std::string> purgeList;
+  leveldb::Iterator* it = db_->NewIterator(
+            leveldb::ReadOptions());
+
+  for (it->SeekToFirst(); it->Valid(); it->Next())
+  {
+    std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this());
+    std::string key = it->key().ToString();
+    if (eventRead->DeSerialize((uint8_t *) it->value().data(),
+        (int) it->value().size()))
+    {
+      auto search = connectionMap.find(eventRead->getConnectionUuid());
+      if (search != connectionMap.end())
+      {
+        // we find the connection for the persistent flowfile, create the flowfile and enqueue that
+	std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
+        std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(),flow_file_ref);
+        // set store to repo to true so that we do need to persistent again in enqueue
+        record->setStoredToRepository(true);
+        search->second->put(record);
+      }
+      else
+      {
+        if (eventRead->getContentFullPath().length() > 0)
+        {
+          std::remove(eventRead->getContentFullPath().c_str());
+        }
+        purgeList.push_back(key);
+      }
+    }
+    else
+    {
+      purgeList.push_back(key);
+    }
+  }
+
+  delete it;
+  std::vector<std::string>::iterator itPurge;
+  for (itPurge = purgeList.begin(); itPurge != purgeList.end();
+            itPurge++)
+  {
+    std::string eventId = *itPurge;
+    logger_->log_info("Repository Repo %s Purge %s",
+                    name_.c_str(),
+                    eventId.c_str());
+    Delete(eventId);
+  }
+
+  return;
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
new file mode 100644
index 0000000..8e59363
--- /dev/null
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -0,0 +1,490 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
+    YAML::Node rootFlowNode) {
+  uuid_t uuid;
+
+  std::string flowName = rootFlowNode["name"].as<std::string>();
+  std::string id ;
+  
+  try {
+    rootFlowNode["id"].as<std::string>();
+
+    uuid_parse(id.c_str(), uuid);
+  }catch(...)
+  {
+    logger_->log_warn("Generating random ID for root node");
+    uuid_generate(uuid);
+    char uuid_str[37];
+    uuid_unparse(uuid,uuid_str);
+    id = uuid_str;
+  }
+
+  logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
+  logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
+  std::unique_ptr<core::ProcessGroup> group =
+      FlowConfiguration::createRootProcessGroup(flowName, uuid);
+
+  this->name_ = flowName;
+
+  return group.release();
+}
+
+void YamlConfiguration::parseProcessorNodeYaml(
+    YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
+  int64_t schedulingPeriod = -1;
+  int64_t penalizationPeriod = -1;
+  int64_t yieldPeriod = -1;
+  int64_t runDurationNanos = -1;
+  uuid_t uuid;
+  std::shared_ptr<core::Processor> processor = nullptr;
+
+  if (!parentGroup) {
+    logger_->log_error("parseProcessNodeYaml: no parent group exists");
+    return;
+  }
+
+  if (processorsNode) {
+
+    if (processorsNode.IsSequence()) {
+      // Evaluate sequence of processors
+      int numProcessors = processorsNode.size();
+
+      for (YAML::const_iterator iter = processorsNode.begin();
+          iter != processorsNode.end(); ++iter) {
+        core::ProcessorConfig procCfg;
+        YAML::Node procNode = iter->as<YAML::Node>();
+
+        procCfg.name = procNode["name"].as<std::string>();
+        procCfg.id = procNode["id"].as<std::string>();
+        logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
+                           procCfg.name.c_str(), procCfg.id.c_str());
+        procCfg.javaClass = procNode["class"].as<std::string>();
+        logger_->log_debug("parseProcessorNode: class => [%s]",
+                           procCfg.javaClass.c_str());
+
+        uuid_parse(procCfg.id.c_str(), uuid);
+
+        // Determine the processor name only from the Java class
+        int lastOfIdx = procCfg.javaClass.find_last_of(".");
+        if (lastOfIdx != std::string::npos) {
+          lastOfIdx++;  // if a value is found, increment to move beyond the .
+          int nameLength = procCfg.javaClass.length() - lastOfIdx;
+          std::string processorName = procCfg.javaClass.substr(lastOfIdx,
+                                                               nameLength);
+          processor = this->createProcessor(processorName, uuid);
+        }
+
+        if (!processor) {
+          logger_->log_error("Could not create a processor %s with name %s",
+                             procCfg.name.c_str(), procCfg.id.c_str());
+          throw std::invalid_argument(
+              "Could not create processor " + procCfg.name);
+        }
+        processor->setName(procCfg.name);
+
+        procCfg.maxConcurrentTasks = procNode["max concurrent tasks"]
+            .as<std::string>();
+        logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]",
+                           procCfg.maxConcurrentTasks.c_str());
+        procCfg.schedulingStrategy = procNode["scheduling strategy"]
+            .as<std::string>();
+        logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]",
+                           procCfg.schedulingStrategy.c_str());
+        procCfg.schedulingPeriod =
+            procNode["scheduling period"].as<std::string>();
+        logger_->log_debug("parseProcessorNode: scheduling period => [%s]",
+                           procCfg.schedulingPeriod.c_str());
+        procCfg.penalizationPeriod = procNode["penalization period"]
+            .as<std::string>();
+        logger_->log_debug("parseProcessorNode: penalization period => [%s]",
+                           procCfg.penalizationPeriod.c_str());
+        procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+        logger_->log_debug("parseProcessorNode: yield period => [%s]",
+                           procCfg.yieldPeriod.c_str());
+        procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+        logger_->log_debug("parseProcessorNode: run duration nanos => [%s]",
+                           procCfg.runDurationNanos.c_str());
+
+        // handle auto-terminated relationships
+        YAML::Node autoTerminatedSequence =
+            procNode["auto-terminated relationships list"];
+        std::vector<std::string> rawAutoTerminatedRelationshipValues;
+        if (autoTerminatedSequence.IsSequence()
+            && !autoTerminatedSequence.IsNull()
+            && autoTerminatedSequence.size() > 0) {
+          for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
+              relIter != autoTerminatedSequence.end(); ++relIter) {
+            std::string autoTerminatedRel = relIter->as<std::string>();
+            rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+          }
+        }
+        procCfg.autoTerminatedRelationships =
+            rawAutoTerminatedRelationshipValues;
+
+        // handle processor properties
+        YAML::Node propertiesNode = procNode["Properties"];
+        parsePropertiesNodeYaml(&propertiesNode, processor);
+
+        // Take care of scheduling
+        core::TimeUnit unit;
+        if (core::Property::StringToTime(procCfg.schedulingPeriod,
+                                         schedulingPeriod, unit)
+            && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
+                                                   schedulingPeriod)) {
+          logger_->log_debug(
+              "convert: parseProcessorNode: schedulingPeriod => [%d] ns",
+              schedulingPeriod);
+          processor->setSchedulingPeriodNano(schedulingPeriod);
+        }
+
+        if (core::Property::StringToTime(procCfg.penalizationPeriod,
+                                         penalizationPeriod, unit)
+            && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit,
+                                                   penalizationPeriod)) {
+          logger_->log_debug(
+              "convert: parseProcessorNode: penalizationPeriod => [%d] ms",
+              penalizationPeriod);
+          processor->setPenalizationPeriodMsec(penalizationPeriod);
+        }
+
+        if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
+            && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit,
+                                                   yieldPeriod)) {
+          logger_->log_debug(
+              "convert: parseProcessorNode: yieldPeriod => [%d] ms",
+              yieldPeriod);
+          processor->setYieldPeriodMsec(yieldPeriod);
+        }
+
+        // Default to running
+        processor->setScheduledState(core::RUNNING);
+
+        if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
+          processor->setSchedulingStrategy(core::TIMER_DRIVEN);
+          logger_->log_debug("setting scheduling strategy as %s",
+                             procCfg.schedulingStrategy.c_str());
+        } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+          processor->setSchedulingStrategy(core::EVENT_DRIVEN);
+          logger_->log_debug("setting scheduling strategy as %s",
+                             procCfg.schedulingStrategy.c_str());
+        } else {
+          processor->setSchedulingStrategy(core::CRON_DRIVEN);
+          logger_->log_debug("setting scheduling strategy as %s",
+                             procCfg.schedulingStrategy.c_str());
+
+        }
+
+        int64_t maxConcurrentTasks;
+        if (core::Property::StringToInt(procCfg.maxConcurrentTasks,
+                                        maxConcurrentTasks)) {
+          logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
+                             maxConcurrentTasks);
+          processor->setMaxConcurrentTasks(maxConcurrentTasks);
+        }
+
+        if (core::Property::StringToInt(procCfg.runDurationNanos,
+                                        runDurationNanos)) {
+          logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]",
+                             runDurationNanos);
+          processor->setRunDurationNano(runDurationNanos);
+        }
+
+        std::set<core::Relationship> autoTerminatedRelationships;
+        for (auto &&relString : procCfg.autoTerminatedRelationships) {
+          core::Relationship relationship(relString, "");
+          logger_->log_debug(
+              "parseProcessorNode: autoTerminatedRelationship  => [%s]",
+              relString.c_str());
+          autoTerminatedRelationships.insert(relationship);
+        }
+
+        processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+        parentGroup->addProcessor(processor);
+      }
+    }
+  } else {
+    throw new std::invalid_argument(
+        "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+  }
+}
+
+void YamlConfiguration::parseRemoteProcessGroupYaml(
+    YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
+  uuid_t uuid;
+
+  if (!parentGroup) {
+    logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
+    return;
+  }
+
+  if (rpgNode) {
+    if (rpgNode->IsSequence()) {
+      for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end();
+          ++iter) {
+        YAML::Node rpgNode = iter->as<YAML::Node>();
+
+        auto name = rpgNode["name"].as<std::string>();
+        auto id = rpgNode["id"].as<std::string>();
+
+        logger_->log_debug(
+            "parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
+            name.c_str(), id.c_str());
+
+        std::string url = rpgNode["url"].as<std::string>();
+        logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
+                           url.c_str());
+
+        std::string timeout = rpgNode["timeout"].as<std::string>();
+        logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]",
+                           timeout.c_str());
+
+        std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
+        logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]",
+                           yieldPeriod.c_str());
+
+        YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
+        YAML::Node outputPorts = rpgNode["Output Ports"].as<YAML::Node>();
+        core::ProcessGroup *group = NULL;
+
+        uuid_parse(id.c_str(), uuid);
+
+        int64_t timeoutValue = -1;
+        int64_t yieldPeriodValue = -1;
+
+        group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
+        group->setParent(parentGroup);
+        parentGroup->addProcessGroup(group);
+
+        core::TimeUnit unit;
+
+        if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+            && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
+                                                   yieldPeriodValue) && group) {
+          logger_->log_debug(
+              "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
+              yieldPeriodValue);
+          group->setYieldPeriodMsec(yieldPeriodValue);
+        }
+
+        if (core::Property::StringToTime(timeout, timeoutValue, unit)
+            && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
+                                                   timeoutValue) && group) {
+          logger_->log_debug(
+              "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
+              timeoutValue);
+          group->setTimeOut(timeoutValue);
+        }
+
+        group->setTransmitting(true);
+        group->setURL(url);
+
+        if (inputPorts && inputPorts.IsSequence()) {
+          for (YAML::const_iterator portIter = inputPorts.begin();
+              portIter != inputPorts.end(); ++portIter) {
+            logger_->log_debug("Got a current port, iterating...");
+
+            YAML::Node currPort = portIter->as<YAML::Node>();
+
+            this->parsePortYaml(&currPort, group, SEND);
+          }  // for node
+        }
+        if (outputPorts && outputPorts.IsSequence()) {
+          for (YAML::const_iterator portIter = outputPorts.begin();
+              portIter != outputPorts.end(); ++portIter) {
+            logger_->log_debug("Got a current port, iterating...");
+
+            YAML::Node currPort = portIter->as<YAML::Node>();
+
+            this->parsePortYaml(&currPort, group, RECEIVE);
+          }  // for node
+        }
+
+      }
+    }
+  }
+}
+
+void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
+                                            core::ProcessGroup *parent) {
+  uuid_t uuid;
+  std::shared_ptr<minifi::Connection> connection = nullptr;
+
+  if (!parent) {
+    logger_->log_error("parseProcessNode: no parent group was provided");
+    return;
+  }
+
+  if (connectionsNode) {
+
+    if (connectionsNode->IsSequence()) {
+      for (YAML::const_iterator iter = connectionsNode->begin();
+          iter != connectionsNode->end(); ++iter) {
+
+        YAML::Node connectionNode = iter->as<YAML::Node>();
+
+        std::string name = connectionNode["name"].as<std::string>();
+        std::string id = connectionNode["id"].as<std::string>();
+        std::string destId = connectionNode["destination id"].as<std::string>();
+
+        uuid_parse(id.c_str(), uuid);
+
+        logger_->log_debug("Created connection with UUID %s and name %s",
+                           id.c_str(), name.c_str());
+        connection = this->createConnection(name, uuid);
+        auto rawRelationship = connectionNode["source relationship name"]
+            .as<std::string>();
+        core::Relationship relationship(rawRelationship, "");
+        logger_->log_debug("parseConnection: relationship => [%s]",
+                           rawRelationship.c_str());
+        if (connection)
+          connection->setRelationship(relationship);
+        std::string connectionSrcProcId = connectionNode["source id"]
+            .as<std::string>();
+        uuid_t srcUUID;
+        uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+
+        auto srcProcessor = parent->findProcessor(srcUUID);
+
+        if (!srcProcessor) {
+          logger_->log_error(
+              "Could not locate a source with id %s to create a connection",
+              connectionSrcProcId.c_str());
+          throw std::invalid_argument(
+              "Could not locate a source with id %s to create a connection "
+                  + connectionSrcProcId);
+        }
+
+        uuid_t destUUID;
+        uuid_parse(destId.c_str(), destUUID);
+        auto destProcessor = parent->findProcessor(destUUID);
+        // If we could not find name, try by UUID
+        if (!destProcessor) {
+          uuid_t destUuid;
+          uuid_parse(destId.c_str(), destUuid);
+          destProcessor = parent->findProcessor(destUuid);
+        }
+        if (destProcessor) {
+          std::string destUuid = destProcessor->getUUIDStr();
+        }
+
+        uuid_t srcUuid;
+        uuid_t destUuid;
+        srcProcessor->getUUID(srcUuid);
+        connection->setSourceUUID(srcUuid);
+        destProcessor->getUUID(destUuid);
+        connection->setDestinationUUID(destUuid);
+
+        if (connection) {
+          parent->addConnection(connection);
+        }
+      }
+    }
+
+    if (connection)
+      parent->addConnection(connection);
+
+    return;
+  }
+}
+
+void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
+                                      core::ProcessGroup *parent,
+                                      TransferDirection direction) {
+  uuid_t uuid;
+  std::shared_ptr<core::Processor> processor = NULL;
+  minifi::RemoteProcessorGroupPort *port = NULL;
+
+  if (!parent) {
+    logger_->log_error("parseProcessNode: no parent group existed");
+    return;
+  }
+
+  YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+
+  // generate the random UIID
+  uuid_generate(uuid);
+
+  auto portId = inputPortsObj["id"].as<std::string>();
+  auto nameStr = inputPortsObj["name"].as<std::string>();
+  uuid_parse(portId.c_str(), uuid);
+
+  port = new minifi::RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+
+  processor = (std::shared_ptr<core::Processor>) port;
+  port->setDirection(direction);
+  port->setTimeOut(parent->getTimeOut());
+  port->setTransmitting(true);
+  processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+  processor->initialize();
+
+  // handle port properties
+  YAML::Node nodeVal = portNode->as<YAML::Node>();
+  YAML::Node propertiesNode = nodeVal["Properties"];
+
+  parsePropertiesNodeYaml(&propertiesNode, processor);
+
+  // add processor to parent
+  parent->addProcessor(processor);
+  processor->setScheduledState(core::RUNNING);
+  auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"]
+      .as<std::string>();
+  int64_t maxConcurrentTasks;
+  if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+    processor->setMaxConcurrentTasks(maxConcurrentTasks);
+  }
+  logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
+                     maxConcurrentTasks);
+  processor->setMaxConcurrentTasks(maxConcurrentTasks);
+
+}
+
+void YamlConfiguration::parsePropertiesNodeYaml(
+    YAML::Node *propertiesNode, std::shared_ptr<core::Processor> processor) {
+  // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
+  for (YAML::const_iterator propsIter = propertiesNode->begin();
+      propsIter != propertiesNode->end(); ++propsIter) {
+    std::string propertyName = propsIter->first.as<std::string>();
+    YAML::Node propertyValueNode = propsIter->second;
+    if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
+      std::string rawValueString = propertyValueNode.as<std::string>();
+      if (!processor->setProperty(propertyName, rawValueString)) {
+        logger_->log_warn(
+            "Received property %s with value %s but is not one of the properties for %s",
+            propertyName.c_str(), rawValueString.c_str(),
+            processor->getName().c_str());
+      }
+    }
+  }
+}
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index cf3fe45..1400a1d 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -18,6 +18,12 @@
 #include "io/BaseStream.h"
 #include "io/Serializable.h"
 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 /**
  * write 4 bytes to stream
  * @param base_value non encoded value
@@ -26,7 +32,7 @@
  * @return resulting write size
  **/
 int BaseStream::write(uint32_t base_value, bool is_little_endian) {
-	return ::Serializable::write(base_value, (DataStream*) this, is_little_endian);
+	return Serializable::write(base_value, (DataStream*) this, is_little_endian);
 }
 
 /**
@@ -37,7 +43,7 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint16_t base_value, bool is_little_endian) {
-	return ::Serializable::write(base_value, (DataStream*) this, is_little_endian);
+	return Serializable::write(base_value, (DataStream*) this, is_little_endian);
 }
 
 /**
@@ -48,7 +54,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint8_t *value, int len) {
-	return ::Serializable::write(value, len, (DataStream*) this);
+	return Serializable::write(value, len, (DataStream*) this);
 }
 
 /**
@@ -59,7 +65,7 @@ int BaseStream::write(uint8_t *value, int len) {
  * @return resulting write size
  **/
 int BaseStream::write(uint64_t base_value, bool is_little_endian) {
-	return ::Serializable::write(base_value, (DataStream*) this, is_little_endian);
+	return Serializable::write(base_value, (DataStream*) this, is_little_endian);
 }
 
 /**
@@ -69,7 +75,7 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) {
  **/
 int BaseStream::write(bool value) {
 	uint8_t v = value;
-	return ::Serializable::write(v);
+	return Serializable::write(v);
 }
 
 /**
@@ -78,7 +84,7 @@ int BaseStream::write(bool value) {
  * @return resulting write size
  **/
 int BaseStream::writeUTF(std::string str, bool widen) {
-	return ::Serializable::writeUTF(str, (DataStream*) this, widen);
+	return Serializable::writeUTF(str, (DataStream*) this, widen);
 }
 
 /**
@@ -88,7 +94,7 @@ int BaseStream::writeUTF(std::string str, bool widen) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t &value) {
-	return ::Serializable::read(value, (DataStream*) this);
+	return Serializable::read(value, (DataStream*) this);
 }
 
 /**
@@ -98,7 +104,7 @@ int BaseStream::read(uint8_t &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
-	return ::Serializable::read(base_value, (DataStream*) this);
+	return Serializable::read(base_value, (DataStream*) this);
 }
 
 /**
@@ -108,7 +114,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(char &value) {
-	return ::Serializable::read(value, (DataStream*) this);
+	return Serializable::read(value, (DataStream*) this);
 }
 
 /**
@@ -119,7 +125,7 @@ int BaseStream::read(char &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t *value, int len) {
-	return ::Serializable::read(value, len, (DataStream*) this);
+	return Serializable::read(value, len, (DataStream*) this);
 }
 
 /**
@@ -129,7 +135,7 @@ int BaseStream::read(uint8_t *value, int len) {
  * @return resulting read size
  **/
 int BaseStream::read(uint32_t &value, bool is_little_endian) {
-	return ::Serializable::read(value, (DataStream*) this, is_little_endian);
+	return Serializable::read(value, (DataStream*) this, is_little_endian);
 }
 
 /**
@@ -139,7 +145,7 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(uint64_t &value, bool is_little_endian) {
-	return ::Serializable::read(value, (DataStream*) this, is_little_endian);
+	return Serializable::read(value, (DataStream*) this, is_little_endian);
 }
 
 /**
@@ -149,5 +155,12 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::readUTF(std::string &str, bool widen) {
-	return ::Serializable::readUTF(str, (DataStream*) this, widen);
+	return Serializable::readUTF(str, (DataStream*) this, widen);
 }
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 39b71b4..ad6b04d 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -30,425 +30,424 @@
 #include "io/validation.h"
 #include "io/ClientSocket.h"
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 
 std::string Socket::HOSTNAME = Socket::getMyHostName(0);
 
-
 Socket::Socket(const std::string &hostname, const uint16_t port,
-		const uint16_t listeners = -1) :
-		requested_hostname_(hostname), port_(port), addr_info_(0), socket_file_descriptor_(
-				-1), socket_max_(0), listeners_(listeners), canonical_hostname_("") {
-	logger_ = Logger::getLogger();
-	FD_ZERO(&total_list_);
-	FD_ZERO(&read_fds_);
+               const uint16_t listeners = -1)
+    : requested_hostname_(hostname),
+      port_(port),
+      addr_info_(0),
+      socket_file_descriptor_(-1),
+      socket_max_(0),
+      listeners_(listeners),
+      canonical_hostname_("") {
+  logger_ = logging::Logger::getLogger();
+  FD_ZERO(&total_list_);
+  FD_ZERO(&read_fds_);
 
 }
 
-Socket::Socket(const std::string &hostname, const uint16_t port) :
-		::Socket(hostname, port, 0) {
+Socket::Socket(const std::string &hostname, const uint16_t port)
+    : Socket(hostname, port, 0) {
 
 }
 
-Socket::Socket(const Socket &&other) :
-		requested_hostname_(std::move(other.requested_hostname_)), port_(
-				std::move(other.port_)), addr_info_(std::move(other.addr_info_)), socket_file_descriptor_(
-				other.socket_file_descriptor_), socket_max_(
-				other.socket_max_.load()), listeners_(other.listeners_), total_list_(
-				other.total_list_), read_fds_(other.read_fds_), canonical_hostname_(
-				std::move(other.canonical_hostname_)) {
-	logger_ = Logger::getLogger();
+Socket::Socket(const Socket &&other)
+    : requested_hostname_(std::move(other.requested_hostname_)),
+      port_(std::move(other.port_)),
+      addr_info_(std::move(other.addr_info_)),
+      socket_file_descriptor_(other.socket_file_descriptor_),
+      socket_max_(other.socket_max_.load()),
+      listeners_(other.listeners_),
+      total_list_(other.total_list_),
+      read_fds_(other.read_fds_),
+      canonical_hostname_(std::move(other.canonical_hostname_)) {
+  logger_ = logging::Logger::getLogger();
 
 }
 
 Socket::~Socket() {
-	closeStream();
+  closeStream();
 }
 
-void Socket::closeStream()
-{
-    if (0 != addr_info_) {
-	    freeaddrinfo(addr_info_);
-	    addr_info_=0;
-    }
+void Socket::closeStream() {
+  if (0 != addr_info_) {
+    freeaddrinfo(addr_info_);
+    addr_info_ = 0;
+  }
 
-    if (socket_file_descriptor_ >= 0)
-    {
-	    close(socket_file_descriptor_);
-	    socket_file_descriptor_ = -1;
-    }
+  if (socket_file_descriptor_ >= 0) {
+    close(socket_file_descriptor_);
+    socket_file_descriptor_ = -1;
+  }
 }
 
-int8_t Socket::createConnection(const addrinfo *p,in_addr_t &addr) {
-	if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype,
-			p->ai_protocol)) == -1) {
-		logger_->log_error("error while connecting to server socket");
-		return -1;
-	}
-
-	setSocketOptions(socket_file_descriptor_);
-
-	if (listeners_ > 0) {
-
-		struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
-		sa_loc->sin_family = AF_INET;
-		sa_loc->sin_port = htons(port_);
-		sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-
-		if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
-			logger_->log_error("Could not bind to socket", strerror(errno));
-			return -1;
-		}
-	}
-	{
-	  if (listeners_ <=0 )
-	  {
-		struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
-		sa_loc->sin_family = AF_INET;
-		//sa_loc->sin_port = htons(port);
-		sa_loc->sin_port = htons(port_);
-		// use any address if you are connecting to the local machine for testing
-		// otherwise we must use the requested hostname
-		if (IsNullOrEmpty(requested_hostname_) || requested_hostname_=="localhost")
-		{
-		  sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-		}
-		else
-		{
-		  sa_loc->sin_addr.s_addr = addr;
-		}
-		if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
-			  close(socket_file_descriptor_);
-				socket_file_descriptor_ = -1;
-				logger_->log_warn("Could not connect to socket, error:%s", strerror(errno));
-				return -1;
-
-		}
-	  }
-	}
-
-	// listen
-	if (listeners_ > 0) {
-		if (listen(socket_file_descriptor_, listeners_) == -1) {
-			logger_->log_warn("attempted connection, saw %s", strerror(errno));
-			return -1;
-		}
-
-	}
-	// add the listener to the total set
-	FD_SET(socket_file_descriptor_, &total_list_);
-	socket_max_ = socket_file_descriptor_;
-	return 0;
-}
+int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
+  if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype,
+                                        p->ai_protocol)) == -1) {
+    logger_->log_error("error while connecting to server socket");
+    return -1;
+  }
 
-short Socket::initialize() {
+  setSocketOptions(socket_file_descriptor_);
 
-	struct sockaddr_in servAddr;
+  if (listeners_ > 0) {
 
-	addrinfo hints = { sizeof(addrinfo) };
-	memset(&hints, 0, sizeof hints); // make sure the struct is empty
-	hints.ai_family = AF_UNSPEC;
-	hints.ai_socktype = SOCK_STREAM;
-	hints.ai_flags = AI_CANONNAME;
-	if (listeners_ > 0)
-	    hints.ai_flags |= AI_PASSIVE;
+    struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+    sa_loc->sin_family = AF_INET;
+    sa_loc->sin_port = htons(port_);
+    sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
 
-	hints.ai_protocol = 0; /* any protocol */
+    if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+      logger_->log_error("Could not bind to socket", strerror(errno));
+      return -1;
+    }
+  }
+  {
+    if (listeners_ <= 0) {
+      struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+      sa_loc->sin_family = AF_INET;
+      //sa_loc->sin_port = htons(port);
+      sa_loc->sin_port = htons(port_);
+      // use any address if you are connecting to the local machine for testing
+      // otherwise we must use the requested hostname
+      if (IsNullOrEmpty(requested_hostname_)
+          || requested_hostname_ == "localhost") {
+        sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+      } else {
+        sa_loc->sin_addr.s_addr = addr;
+      }
+      if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+        close(socket_file_descriptor_);
+        socket_file_descriptor_ = -1;
+        logger_->log_warn("Could not connect to socket, error:%s",
+                          strerror(errno));
+        return -1;
+
+      }
+    }
+  }
 
-	int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_);
+  // listen
+  if (listeners_ > 0) {
+    if (listen(socket_file_descriptor_, listeners_) == -1) {
+      logger_->log_warn("attempted connection, saw %s", strerror(errno));
+      return -1;
+    }
 
-	if (errcode != 0) {
-		logger_->log_error("Saw error during getaddrinfo, error: %s",strerror(errno));
-		return -1;
-	}
+  }
+  // add the listener to the total set
+  FD_SET(socket_file_descriptor_, &total_list_);
+  socket_max_ = socket_file_descriptor_;
+  return 0;
+}
 
-	socket_file_descriptor_ = -1;
+short Socket::initialize() {
 
-	in_addr_t addr;
-	  struct hostent *h;
-	#ifdef __MACH__
-	  h = gethostbyname(requested_hostname_.c_str());
-	#else
-	  const char *host;
-    uint16_t port;
+  struct sockaddr_in servAddr;
 
-    host = requested_hostname_.c_str();
-    port = port_;
-	  char buf[1024];
-	  struct hostent he;
-	  int hh_errno;
-	  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-	#endif
+  addrinfo hints = { sizeof(addrinfo) };
+  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_CANONNAME;
+  if (listeners_ > 0)
+    hints.ai_flags |= AI_PASSIVE;
 
-	  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+  hints.ai_protocol = 0; /* any protocol */
 
+  int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints,
+                            &addr_info_);
 
-	auto p = addr_info_;
-	for (; p != NULL; p = p->ai_next) {
-		if (IsNullOrEmpty(canonical_hostname_)) {
-			if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
-				canonical_hostname_ = p->ai_canonname;
-		}
+  if (errcode != 0) {
+    logger_->log_error("Saw error during getaddrinfo, error: %s",
+                       strerror(errno));
+    return -1;
+  }
 
+  socket_file_descriptor_ = -1;
 
-		//we've successfully connected
-		if (port_ > 0 && createConnection(p,addr) >= 0)
-		{
-			return 0;
-			break;
-		}
-	}
+  in_addr_t addr;
+  struct hostent *h;
+#ifdef __MACH__
+  h = gethostbyname(requested_hostname_.c_str());
+#else
+  const char *host;
+  uint16_t port;
+
+  host = requested_hostname_.c_str();
+  port = port_;
+  char buf[1024];
+  struct hostent he;
+  int hh_errno;
+  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
 
-	return -1;
+  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+
+  auto p = addr_info_;
+  for (; p != NULL; p = p->ai_next) {
+    if (IsNullOrEmpty(canonical_hostname_)) {
+      if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
+        canonical_hostname_ = p->ai_canonname;
+    }
+
+    //we've successfully connected
+    if (port_ > 0 && createConnection(p, addr) >= 0) {
+      return 0;
+      break;
+    }
+  }
+
+  return -1;
 
 }
 
 short Socket::select_descriptor(const uint16_t msec) {
-	struct timeval tv;
-	int retval;
-
-	read_fds_ = total_list_;
-
-	tv.tv_sec = msec / 1000;
-	tv.tv_usec = (msec % 1000) * 1000;
-
-	std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
-
-	if (msec > 0)
-		retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
-	else
-		retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
-
-	if (retval < 0) {
-		logger_->log_error("Saw error during selection, error:%i %s",retval,strerror(errno));
-		return retval;
-	}
-
-	for (int i = 0; i <= socket_max_; i++) {
-		if (FD_ISSET(i, &read_fds_)) {
-
-			if (i == socket_file_descriptor_) {
-			  if (listeners_ > 0)
-			  {
-				struct sockaddr_storage remoteaddr; // client address
-				socklen_t addrlen = sizeof remoteaddr;
-				int newfd = accept(socket_file_descriptor_,
-						(struct sockaddr *) &remoteaddr, &addrlen);
-				FD_SET(newfd, &total_list_); // add to master set
-				if (newfd > socket_max_) {    // keep track of the max
-					socket_max_ = newfd;
-				}
-				return newfd;
-
-
-
-			  }
-			  else{
-			    return socket_file_descriptor_;
-			  }
-				// we have a new connection
-			} else {
-				// data to be received on i
-				return i;
-			}
-		}
-
-	}
-
-	return -1;
+  struct timeval tv;
+  int retval;
+
+  read_fds_ = total_list_;
+
+  tv.tv_sec = msec / 1000;
+  tv.tv_usec = (msec % 1000) * 1000;
+
+  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+
+  if (msec > 0)
+    retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
+  else
+    retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
+
+  if (retval < 0) {
+    logger_->log_error("Saw error during selection, error:%i %s", retval,
+                       strerror(errno));
+    return retval;
+  }
+
+  for (int i = 0; i <= socket_max_; i++) {
+    if (FD_ISSET(i, &read_fds_)) {
+
+      if (i == socket_file_descriptor_) {
+        if (listeners_ > 0) {
+          struct sockaddr_storage remoteaddr;  // client address
+          socklen_t addrlen = sizeof remoteaddr;
+          int newfd = accept(socket_file_descriptor_,
+                             (struct sockaddr *) &remoteaddr, &addrlen);
+          FD_SET(newfd, &total_list_);  // add to master set
+          if (newfd > socket_max_) {    // keep track of the max
+            socket_max_ = newfd;
+          }
+          return newfd;
+
+        } else {
+          return socket_file_descriptor_;
+        }
+        // we have a new connection
+      } else {
+        // data to be received on i
+        return i;
+      }
+    }
+
+  }
+
+  return -1;
 }
 
 short Socket::setSocketOptions(const int sock) {
-	int opt = 1;
-	bool nagle_off = true;
+  int opt = 1;
+  bool nagle_off = true;
 #ifndef __MACH__
-	if (nagle_off) {
-		if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt))
-				< 0) {
-			logger_->log_error("setsockopt() TCP_NODELAY failed");
-			close(sock);
-			return -1;
-		}
-		if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
-						sizeof(opt)) < 0) {
-			logger_->log_error("setsockopt() SO_REUSEADDR failed");
-			close(sock);
-			return -1;
-		}
-	}
-
-	int sndsize = 256 * 1024;
-	if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize,
-					(int) sizeof(sndsize)) < 0) {
-		logger_->log_error("setsockopt() SO_SNDBUF failed");
-		close(sock);
-		return -1;
-	}
+  if (nagle_off) {
+    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt))
+        < 0) {
+      logger_->log_error("setsockopt() TCP_NODELAY failed");
+      close(sock);
+      return -1;
+    }
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
+            sizeof(opt)) < 0) {
+      logger_->log_error("setsockopt() SO_REUSEADDR failed");
+      close(sock);
+      return -1;
+    }
+  }
+
+  int sndsize = 256 * 1024;
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize,
+          (int) sizeof(sndsize)) < 0) {
+    logger_->log_error("setsockopt() SO_SNDBUF failed");
+    close(sock);
+    return -1;
+  }
 
 #else
-	if (listeners_ > 0) {
-		// lose the pesky "address already in use" error message
-		if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt,
-				sizeof(opt)) < 0) {
-			logger_->log_error("setsockopt() SO_REUSEADDR failed");
-			close(sock);
-			return -1;
-		}
-	}
+  if (listeners_ > 0) {
+    // lose the pesky "address already in use" error message
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt))
+        < 0) {
+      logger_->log_error("setsockopt() SO_REUSEADDR failed");
+      close(sock);
+      return -1;
+    }
+  }
 #endif
-	return 0;
+  return 0;
 }
 
 std::string Socket::getHostname() const {
-	return canonical_hostname_;
+  return canonical_hostname_;
 }
 
 int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
 
-	if (buf.capacity() < buflen)
-		return -1;
-	return writeData((uint8_t*) &buf[0], buflen);
+  if (buf.capacity() < buflen)
+    return -1;
+  return writeData((uint8_t*) &buf[0], buflen);
 }
 
 // data stream overrides
 
 int Socket::writeData(uint8_t *value, int size) {
 
-	int ret = 0, bytes = 0;
+  int ret = 0, bytes = 0;
 
-	while (bytes < size) {
+  while (bytes < size) {
 
+    ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
+    //check for errors
+    if (ret <= 0) {
+      close(socket_file_descriptor_);
+      logger_->log_error("Could not send to %d, error: %s",
+                         socket_file_descriptor_, strerror(errno));
+      return ret;
+    }
+    bytes += ret;
 
+  }
 
-		ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0);
-		//check for errors
-		if (ret <= 0) {
-			close(socket_file_descriptor_);
-			logger_->log_error("Could not send to %d, error: %s",socket_file_descriptor_, strerror(errno));
-			return ret;
-		}
-		bytes += ret;
-
-	}
-
-	if (ret)
-		logger_->log_trace("Send data size %d over socket %d", size,
-				socket_file_descriptor_);
+  if (ret)
+    logger_->log_trace("Send data size %d over socket %d", size,
+                       socket_file_descriptor_);
 
-	return bytes;
+  return bytes;
 
 }
 
 template<typename T>
 inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
-	std::vector<uint8_t> buf;
-	buf.resize(sizeof t);
-	readData((uint8_t*) &buf[0], sizeof(t));
-	return buf;
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData((uint8_t*) &buf[0], sizeof(t));
+  return buf;
 }
 
+int Socket::write(uint64_t base_value, bool is_little_endian) {
 
-int Socket::write(uint64_t base_value, bool is_little_endian){
-
-  return Serializable::write(base_value,this,is_little_endian);
+  return Serializable::write(base_value, this, is_little_endian);
 }
 
-
-int Socket::write(uint32_t base_value, bool is_little_endian){
-  return Serializable::write(base_value,this,is_little_endian);
+int Socket::write(uint32_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
 }
 
-int Socket::write(uint16_t base_value, bool is_little_endian){
-  return Serializable::write(base_value,this,is_little_endian);
+int Socket::write(uint16_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
 }
 
-
 int Socket::read(uint64_t &value, bool is_little_endian) {
 
-	auto buf = readBuffer(value);
-
-	if (is_little_endian) {
-		value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
-				| ((uint64_t) (buf[2] & 255) << 40)
-				| ((uint64_t) (buf[3] & 255) << 32)
-				| ((uint64_t) (buf[4] & 255) << 24)
-				| ((uint64_t) (buf[5] & 255) << 16)
-				| ((uint64_t) (buf[6] & 255) << 8)
-				| ((uint64_t) (buf[7] & 255) << 0);
-	} else {
-		value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
-				| ((uint64_t) (buf[2] & 255) << 16)
-				| ((uint64_t) (buf[3] & 255) << 24)
-				| ((uint64_t) (buf[4] & 255) << 32)
-				| ((uint64_t) (buf[5] & 255) << 40)
-				| ((uint64_t) (buf[6] & 255) << 48)
-				| ((uint64_t) (buf[7] & 255) << 56);
-	}
-	return sizeof(value);
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
+        | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
+        | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
+        | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
+        | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
+        | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
+        | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+  }
+  return sizeof(value);
 }
 
 int Socket::read(uint32_t &value, bool is_little_endian) {
 
-	auto buf = readBuffer(value);
+  auto buf = readBuffer(value);
 
-	if (is_little_endian) {
-		value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
-	} else {
-		value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
 
-	}
+  }
 
-	return sizeof(value);
+  return sizeof(value);
 }
 
 int Socket::read(uint16_t &value, bool is_little_endian) {
 
-	auto buf = readBuffer(value);
+  auto buf = readBuffer(value);
 
-	if (is_little_endian) {
-		value = (buf[0] << 8) | buf[1];
-	} else {
-		value = buf[0] | buf[1] << 8;
+  if (is_little_endian) {
+    value = (buf[0] << 8) | buf[1];
+  } else {
+    value = buf[0] | buf[1] << 8;
 
-	}
-	return sizeof(value);
+  }
+  return sizeof(value);
 }
 
 int Socket::readData(std::vector<uint8_t> &buf, int buflen) {
 
-	if (buf.capacity() < buflen)
-	{
-	      buf.resize(buflen);
-	}
-	return readData((uint8_t*) &buf[0], buflen);
+  if (buf.capacity() < buflen) {
+    buf.resize(buflen);
+  }
+  return readData((uint8_t*) &buf[0], buflen);
 }
 
 int Socket::readData(uint8_t *buf, int buflen) {
 
-	int total_read = 0;
-	while (buflen) {
-		short fd = select_descriptor(1000);
-		if (fd < 0) {
-
-			logger_->log_info("fd close %i",buflen);
-			close(socket_file_descriptor_);
-			return -1;
-		}
-
-		int bytes_read = recv(fd, buf, buflen, 0);
-		if (bytes_read <= 0) {
-			if (bytes_read == 0)
-				logger_->log_info("Other side hung up on %d", fd);
-			else {
-				logger_->log_error("Could not recv on %d, error: %s",fd, strerror(errno));
-			}
-			return -1;
-		}
-
-
-		buflen -= bytes_read;
-		buf += bytes_read;
-		total_read += bytes_read;
-	}
-
-	return total_read;
+  int total_read = 0;
+  while (buflen) {
+    short fd = select_descriptor(1000);
+    if (fd < 0) {
+
+      logger_->log_info("fd close %i", buflen);
+      close(socket_file_descriptor_);
+      return -1;
+    }
+
+    int bytes_read = recv(fd, buf, buflen, 0);
+    if (bytes_read <= 0) {
+      if (bytes_read == 0)
+        logger_->log_info("Other side hung up on %d", fd);
+      else {
+        logger_->log_error("Could not recv on %d, error: %s", fd,
+                           strerror(errno));
+      }
+      return -1;
+    }
+
+    buflen -= bytes_read;
+    buf += bytes_read;
+    total_read += bytes_read;
+  }
+
+  return total_read;
 }
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 3ff0a57..7a10bd9 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -27,6 +27,11 @@
 #include <algorithm>
 #include "io/DataStream.h"
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 
 
 int DataStream::writeData(uint8_t *value, int size) {
@@ -126,3 +131,10 @@ int DataStream::readData(uint8_t *buf,int buflen) {
     readBuffer += buflen;
     return buflen;
 }
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/EndianCheck.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/EndianCheck.cpp b/libminifi/src/io/EndianCheck.cpp
index f1bc98b..1b5020d 100644
--- a/libminifi/src/io/EndianCheck.cpp
+++ b/libminifi/src/io/EndianCheck.cpp
@@ -18,5 +18,16 @@
 
 #include "io/EndianCheck.h"
 
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian();
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index 78dc63c..f8c623a 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -22,7 +22,11 @@
 #include <arpa/inet.h>
 #include "io/DataStream.h"
 #include "io/Serializable.h"
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 
 #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
 
@@ -216,3 +220,10 @@ int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) {
     }
     return ret;
 }
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/SocketFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/SocketFactory.cpp b/libminifi/src/io/SocketFactory.cpp
deleted file mode 100644
index cbfdf96..0000000
--- a/libminifi/src/io/SocketFactory.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "io/SocketFactory.h"
-
-#include <atomic>
-#include <mutex> 
-  
-std::atomic<SocketFactory*> SocketFactory::context_instance_;
-std::mutex SocketFactory::context_mutex_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp
new file mode 100644
index 0000000..e3aa290
--- /dev/null
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../../include/io/StreamFactory.h"
+
+#include <atomic>
+#include <mutex> 
+  
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+std::atomic<StreamFactory*> StreamFactory::context_instance_;
+std::mutex StreamFactory::context_mutex_;
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/TLSSocket.cpp b/libminifi/src/io/TLSSocket.cpp
deleted file mode 100644
index 1c81f6c..0000000
--- a/libminifi/src/io/TLSSocket.cpp
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef OPENSSL_SUPPORT
-#include "Property.h"
-#include "Configure.h"
-#include "io/TLSSocket.h"
-#include "utils/StringUtils.h"
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
-
-std::atomic<TLSContext*> TLSContext::context_instance;
-std::mutex TLSContext::context_mutex;
-
-TLSContext::TLSContext() :
-		error_value(0), ctx(0), logger_(Logger::getLogger()), configuration(
-				Configure::getConfigure()) {
-
-}
-
-/**
- * The memory barrier is defined by the singleton
- */
-short TLSContext::initialize() {
-	if (ctx != 0) {
-		return error_value;
-	}
-	std::string clientAuthStr;
-	bool needClientCert = true;
-	if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
-			clientAuthStr)
-			&& StringUtils::StringToBool(clientAuthStr, needClientCert))) {
-		needClientCert = true;
-	}
-
-	SSL_library_init();
-	const SSL_METHOD *method;
-
-	OpenSSL_add_all_algorithms();
-	SSL_load_error_strings();
-	method = TLSv1_2_client_method();
-	ctx = SSL_CTX_new(method);
-	if (ctx == NULL) {
-		logger_->log_error("Could not create SSL context, error: %s.",
-				std::strerror(errno));
-		error_value = TLS_ERROR_CONTEXT;
-		return error_value;
-	}
-	if (needClientCert) {
-		std::string certificate;
-		std::string privatekey;
-		std::string passphrase;
-		std::string caCertificate;
-
-		if (!(configuration->get(Configure::nifi_security_client_certificate,
-				certificate)
-				&& configuration->get(
-						Configure::nifi_security_client_private_key, privatekey))) {
-			logger_->log_error(
-					"Certificate and Private Key PEM file not configured, error: %s.",
-					std::strerror(errno));
-			error_value = TLS_ERROR_PEM_MISSING;
-			return error_value;
-		}
-		// load certificates and private key in PEM format
-		if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(),
-				SSL_FILETYPE_PEM) <= 0) {
-			logger_->log_error("Could not create load certificate, error : %s",
-					std::strerror(errno));
-			error_value = TLS_ERROR_CERT_MISSING;
-			return error_value;
-
-		}
-		if (configuration->get(Configure::nifi_security_client_pass_phrase,
-				passphrase)) {
-			// if the private key has passphase
-			SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
-		}
-		
-
-		int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
-				SSL_FILETYPE_PEM);
-		if (retp != 1) {
-			logger_->log_error("Could not create load private key,%i on %s error : %s",
-					retp,privatekey.c_str(),std::strerror(errno));
-			error_value = TLS_ERROR_KEY_ERROR;
-			return error_value;
-		}
-		// verify private key
-		if (!SSL_CTX_check_private_key(ctx)) {
-			logger_->log_error(
-					"Private key does not match the public certificate, error : %s",
-					std::strerror(errno));
-			error_value = TLS_ERROR_KEY_ERROR;
-			return error_value;
-		}
-		// load CA certificates
-		if (configuration->get(Configure::nifi_security_client_ca_certificate,
-				caCertificate)) {
-			retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
-			if (retp==0) {
-				logger_->log_error(
-						"Can not load CA certificate, Exiting, error : %s",
-						std::strerror(errno));
-				error_value = TLS_ERROR_CERT_ERROR;
-				return error_value;
-			}
-		}
-
-		logger_->log_info("Load/Verify Client Certificate OK.");
-	}
-	return 0;
-}
-
-TLSSocket::~TLSSocket()
-{
-	if (ssl != 0)
-		SSL_free(ssl);
-}
-/**
- * Constructor that accepts host name, port and listeners. With this
- * contructor we will be creating a server socket
- * @param hostname our host name
- * @param port connecting port
- * @param listeners number of listeners in the queue
- */
-TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port,
-		const uint16_t listeners) :
-		::Socket(hostname, port, listeners), ssl(0) {
-}
-
-TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) :
-		::Socket(hostname, port, 0), ssl(0) {
-}
-
-TLSSocket::TLSSocket(const TLSSocket &&d) :
-		::Socket(std::move(d)), ssl(0) {
-}
-
-short TLSSocket::initialize() {
-	TLSContext *context = TLSContext::getInstance();
-	short ret = context->initialize();
-	Socket::initialize();
-	if (!ret) {
-		// we have s2s secure config
-		ssl = SSL_new(context->getContext());
-		SSL_set_fd(ssl, socket_file_descriptor_);
-		if (SSL_connect(ssl) == -1) {
-			logger_->log_error("SSL socket connect failed to %s %d",
-					requested_hostname_.c_str(), port_);
-			SSL_free(ssl);
-			ssl = NULL;
-			close(socket_file_descriptor_);
-			return -1;
-		} else {
-			logger_->log_info("SSL socket connect success to %s %d",
-					requested_hostname_.c_str(), port_);
-			return 0;
-		}
-	}
-	return ret;
-}
-
-short TLSSocket::select_descriptor(const uint16_t msec) {
-	if (ssl && SSL_pending(ssl))
-		return 1;
-	return Socket::select_descriptor(msec);
-}
-
-int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen)
-{
- return Socket::writeData(buf,buflen);
-}
-
-int TLSSocket::writeData(uint8_t *value, int size) {
-	if (IsNullOrEmpty(ssl))
-	  return -1;
-	// for SSL, wait for the TLS IO is completed
-	int bytes = 0;
-	int sent = 0;
-	while (bytes < size) {
-
-		sent = SSL_write(ssl, value + bytes, size - bytes);
-		//check for errors
-		if (sent < 0) {
-			logger_->log_error("Site2Site Peer socket %d send failed %s",
-					socket_file_descriptor_, strerror(errno));
-			return sent;
-		}
-		bytes += sent;
-	}
-	return size;
-}
-
-int TLSSocket::readData(uint8_t *buf, int buflen) {
-
-	if (IsNullOrEmpty(ssl))
-	  return -1;
-	int total_read = 0;
-	int status = 0;
-	while (buflen) {
-		short fd = select_descriptor(1000);
-		if (fd <= 0) {
-
-			close(socket_file_descriptor_);
-			return -1;
-		}
-
-		int sslStatus;
-		do {
-			status = SSL_read(ssl, buf, buflen);
-			sslStatus = SSL_get_error(ssl, status);
-		} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
-
-		buflen -= status;
-		buf += status;
-		total_read += status;
-	}
-
-	return total_read;
-}
-#endif // #ifdef OPENSSL_SUPPORT

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
new file mode 100644
index 0000000..b2df394
--- /dev/null
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -0,0 +1,249 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "properties/Configure.h"
+#include "io/tls/TLSSocket.h"
+#include "utils/StringUtils.h"
+
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+std::atomic<TLSContext*> TLSContext::context_instance;
+std::mutex TLSContext::context_mutex;
+
+TLSContext::TLSContext() :
+		error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configuration(
+				Configure::getConfigure()) {
+
+}
+
+/**
+ * The memory barrier is defined by the singleton
+ */
+short TLSContext::initialize() {
+	if (ctx != 0) {
+		return error_value;
+	}
+	std::string clientAuthStr;
+	bool needClientCert = true;
+	if (!(configuration->get(Configure::nifi_security_need_ClientAuth,
+			clientAuthStr)
+			&& org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) {
+		needClientCert = true;
+	}
+
+	SSL_library_init();
+	const SSL_METHOD *method;
+
+	OpenSSL_add_all_algorithms();
+	SSL_load_error_strings();
+	method = TLSv1_2_client_method();
+	ctx = SSL_CTX_new(method);
+	if (ctx == NULL) {
+		logger_->log_error("Could not create SSL context, error: %s.",
+				std::strerror(errno));
+		error_value = TLS_ERROR_CONTEXT;
+		return error_value;
+	}
+	if (needClientCert) {
+		std::string certificate;
+		std::string privatekey;
+		std::string passphrase;
+		std::string caCertificate;
+
+		if (!(configuration->get(Configure::nifi_security_client_certificate,
+				certificate)
+				&& configuration->get(
+						Configure::nifi_security_client_private_key, privatekey))) {
+			logger_->log_error(
+					"Certificate and Private Key PEM file not configured, error: %s.",
+					std::strerror(errno));
+			error_value = TLS_ERROR_PEM_MISSING;
+			return error_value;
+		}
+		// load certificates and private key in PEM format
+		if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(),
+				SSL_FILETYPE_PEM) <= 0) {
+			logger_->log_error("Could not create load certificate, error : %s",
+					std::strerror(errno));
+			error_value = TLS_ERROR_CERT_MISSING;
+			return error_value;
+
+		}
+		if (configuration->get(Configure::nifi_security_client_pass_phrase,
+				passphrase)) {
+			// if the private key has passphase
+			SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+		}
+		
+
+		int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
+				SSL_FILETYPE_PEM);
+		if (retp != 1) {
+			logger_->log_error("Could not create load private key,%i on %s error : %s",
+					retp,privatekey.c_str(),std::strerror(errno));
+			error_value = TLS_ERROR_KEY_ERROR;
+			return error_value;
+		}
+		// verify private key
+		if (!SSL_CTX_check_private_key(ctx)) {
+			logger_->log_error(
+					"Private key does not match the public certificate, error : %s",
+					std::strerror(errno));
+			error_value = TLS_ERROR_KEY_ERROR;
+			return error_value;
+		}
+		// load CA certificates
+		if (configuration->get(Configure::nifi_security_client_ca_certificate,
+				caCertificate)) {
+			retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
+			if (retp==0) {
+				logger_->log_error(
+						"Can not load CA certificate, Exiting, error : %s",
+						std::strerror(errno));
+				error_value = TLS_ERROR_CERT_ERROR;
+				return error_value;
+			}
+		}
+
+		logger_->log_info("Load/Verify Client Certificate OK.");
+	}
+	return 0;
+}
+
+TLSSocket::~TLSSocket()
+{
+	if (ssl != 0)
+		SSL_free(ssl);
+}
+/**
+ * Constructor that accepts host name, port and listeners. With this
+ * contructor we will be creating a server socket
+ * @param hostname our host name
+ * @param port connecting port
+ * @param listeners number of listeners in the queue
+ */
+TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port,
+		const uint16_t listeners) :
+		Socket(hostname, port, listeners), ssl(0) {
+}
+
+TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) :
+		Socket(hostname, port, 0), ssl(0) {
+}
+
+TLSSocket::TLSSocket(const TLSSocket &&d) :
+		Socket(std::move(d)), ssl(0) {
+}
+
+short TLSSocket::initialize() {
+	TLSContext *context = TLSContext::getInstance();
+	short ret = context->initialize();
+	Socket::initialize();
+	if (!ret) {
+		// we have s2s secure config
+		ssl = SSL_new(context->getContext());
+		SSL_set_fd(ssl, socket_file_descriptor_);
+		if (SSL_connect(ssl) == -1) {
+			logger_->log_error("SSL socket connect failed to %s %d",
+					requested_hostname_.c_str(), port_);
+			SSL_free(ssl);
+			ssl = NULL;
+			close(socket_file_descriptor_);
+			return -1;
+		} else {
+			logger_->log_info("SSL socket connect success to %s %d",
+					requested_hostname_.c_str(), port_);
+			return 0;
+		}
+	}
+	return ret;
+}
+
+short TLSSocket::select_descriptor(const uint16_t msec) {
+	if (ssl && SSL_pending(ssl))
+		return 1;
+	return Socket::select_descriptor(msec);
+}
+
+int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen)
+{
+ return Socket::writeData(buf,buflen);
+}
+
+int TLSSocket::writeData(uint8_t *value, int size) {
+	if (IsNullOrEmpty(ssl))
+	  return -1;
+	// for SSL, wait for the TLS IO is completed
+	int bytes = 0;
+	int sent = 0;
+	while (bytes < size) {
+
+		sent = SSL_write(ssl, value + bytes, size - bytes);
+		//check for errors
+		if (sent < 0) {
+			logger_->log_error("Site2Site Peer socket %d send failed %s",
+					socket_file_descriptor_, strerror(errno));
+			return sent;
+		}
+		bytes += sent;
+	}
+	return size;
+}
+
+int TLSSocket::readData(uint8_t *buf, int buflen) {
+
+	if (IsNullOrEmpty(ssl))
+	  return -1;
+	int total_read = 0;
+	int status = 0;
+	while (buflen) {
+		short fd = select_descriptor(1000);
+		if (fd <= 0) {
+
+			close(socket_file_descriptor_);
+			return -1;
+		}
+
+		int sslStatus;
+		do {
+			status = SSL_read(ssl, buf, buflen);
+			sslStatus = SSL_get_error(ssl, status);
+		} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+
+		buflen -= status;
+		buf += status;
+		total_read += status;
+	}
+
+	return total_read;
+}
+
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp
new file mode 100644
index 0000000..24ccc9a
--- /dev/null
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -0,0 +1,124 @@
+/**
+ * @file AppendHostInfo.cpp
+ * AppendHostInfo class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <set>
+#include <sys/time.h>
+#include <string.h>
+#include "processors/AppendHostInfo.h"
+#include "core/ProcessContext.h"
+#include "core/Property.h"
+#include "core/ProcessSession.h"
+
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <arpa/inet.h>
+
+#include "../../include/core/FlowFile.h"
+#include "io/ClientSocket.h"
+
+#define __USE_POSIX
+#include <limits.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#ifndef HOST_NAME_MAX
+#define HOST_NAME_MAX 255
+#endif
+
+const std::string AppendHostInfo::ProcessorName("AppendHostInfo");
+core::Property AppendHostInfo::InterfaceName(
+    "Network Interface Name",
+    "Network interface from which to read an IP v4 address", "eth0");
+core::Property AppendHostInfo::HostAttribute(
+    "Hostname Attribute",
+    "Flowfile attribute to used to record the agent's hostname",
+    "source.hostname");
+core::Property AppendHostInfo::IPAttribute(
+    "IP Attribute",
+    "Flowfile attribute to used to record the agent's IP address",
+    "source.ipv4");
+core::Relationship AppendHostInfo::Success(
+    "success", "success operational on the flow record");
+
+void AppendHostInfo::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(InterfaceName);
+  properties.insert(HostAttribute);
+  properties.insert(IPAttribute);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void AppendHostInfo::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::shared_ptr<core::FlowFile> flow =
+      session->get();
+  if (!flow)
+    return;
+
+  //Get Hostname
+
+  std::string hostAttribute = "";
+  context->getProperty(HostAttribute.getName(), hostAttribute);
+  flow->addAttribute(hostAttribute.c_str(),
+                     org::apache::nifi::minifi::io::Socket::getMyHostName());
+
+  //Get IP address for the specified interface
+  std::string iface;
+  context->getProperty(InterfaceName.getName(), iface);
+  //Confirm the specified interface name exists on this device
+  if (if_nametoindex(iface.c_str()) != 0) {
+    struct ifreq ifr;
+    int fd = socket(AF_INET, SOCK_DGRAM, 0);
+    //Type of address to retrieve - IPv4 IP address
+    ifr.ifr_addr.sa_family = AF_INET;
+    //Copy the interface name in the ifreq structure
+    strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1);
+    ioctl(fd, SIOCGIFADDR, &ifr);
+    close(fd);
+
+    std::string ipAttribute;
+    context->getProperty(IPAttribute.getName(), ipAttribute);
+    flow->addAttribute(
+        ipAttribute.c_str(),
+        inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr));
+  }
+
+  // Transfer to the relationship
+  session->transfer(flow, Success);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */