You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/09/20 15:15:01 UTC

[6/9] nifi-minifi-cpp git commit: MINIFICPP-595: Provide basic support for windows. MINIFICPP-32: Add windows event log reader MINIFICPP-596: Build core and libminifi artifacts. Must abstract features that are operating system dependent, such as uuid

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
index f9dc678..69223c4 100644
--- a/libminifi/include/processors/ListenSyslog.h
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -21,14 +21,17 @@
 #define __LISTEN_SYSLOG_H__
 
 #include <stdio.h>
-#include <unistd.h>
 #include <sys/types.h>
+#ifndef WIN32
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
-#include <errno.h>
 #include <sys/select.h>
 #include <sys/time.h>
+#else
+#include <WinSock2.h>
+#endif
+#include <errno.h>
 #include <sys/types.h>
 #include <chrono>
 #include <thread>
@@ -39,12 +42,15 @@
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 
+#ifndef WIN32
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
+
 // SyslogEvent
 typedef struct {
   char *payload;
@@ -58,7 +64,7 @@ class ListenSyslog : public core::Processor {
   /*!
    * Create a new processor
    */
-  ListenSyslog(std::string name, uuid_t uuid = NULL)
+  ListenSyslog(std::string name,  utils::Identifier uuid = utils::Identifier())
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<ListenSyslog>::getLogger()) {
     _eventQueueByteSize = 0;
@@ -216,3 +222,5 @@ REGISTER_RESOURCE(ListenSyslog);
 } /* namespace org */
 
 #endif
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h
index b9e333f..412e068 100644
--- a/libminifi/include/processors/LogAttribute.h
+++ b/libminifi/include/processors/LogAttribute.h
@@ -40,7 +40,7 @@ class LogAttribute : public core::Processor {
   /*!
    * Create a new processor
    */
-  LogAttribute(std::string name, uuid_t uuid = NULL)
+  LogAttribute(std::string name,  utils::Identifier uuid = utils::Identifier())
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index ddcdac1..ba410d5 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -46,7 +46,7 @@ class PutFile : public core::Processor {
   /*!
    * Create a new processor
    */
-  PutFile(std::string name, uuid_t uuid = NULL)
+  PutFile(std::string name,  utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
         logger_(logging::LoggerFactory<PutFile>::getLogger()) {
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/RouteOnAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/RouteOnAttribute.h b/libminifi/include/processors/RouteOnAttribute.h
index 1c1eb45..0737f8f 100644
--- a/libminifi/include/processors/RouteOnAttribute.h
+++ b/libminifi/include/processors/RouteOnAttribute.h
@@ -36,7 +36,7 @@ namespace processors {
 class RouteOnAttribute : public core::Processor {
  public:
 
-  RouteOnAttribute(std::string name, uuid_t uuid = NULL)
+  RouteOnAttribute(std::string name,  utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
         logger_(logging::LoggerFactory<RouteOnAttribute>::getLogger()) {
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index fd59cb5..a98e990 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -40,7 +40,7 @@ class TailFile : public core::Processor {
   /*!
    * Create a new processor
    */
-  explicit TailFile(std::string name, uuid_t uuid = NULL)
+  explicit TailFile(std::string name,  utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
         logger_(logging::LoggerFactory<TailFile>::getLogger()) {
     _stateRecovered = false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/UpdateAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/UpdateAttribute.h b/libminifi/include/processors/UpdateAttribute.h
index 117c78a..a768a87 100644
--- a/libminifi/include/processors/UpdateAttribute.h
+++ b/libminifi/include/processors/UpdateAttribute.h
@@ -36,7 +36,7 @@ namespace processors {
 class UpdateAttribute : public core::Processor {
  public:
 
-  UpdateAttribute(std::string name, uuid_t uuid = NULL)
+  UpdateAttribute(std::string name,  utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
         logger_(logging::LoggerFactory<UpdateAttribute>::getLogger()) {
   }
@@ -64,7 +64,7 @@ class UpdateAttribute : public core::Processor {
 
  private:
   std::shared_ptr<logging::Logger> logger_;
-  std::vector<std::string> attributes_;
+  std::vector<core::Property> attributes_;
 };
 
 REGISTER_RESOURCE(UpdateAttribute);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 72fd379..5ec33fa 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -18,7 +18,6 @@
 #ifndef __PROVENANCE_H__
 #define __PROVENANCE_H__
 
-#include <ftw.h>
 #include <uuid/uuid.h>
 #include <atomic>
 #include <cstdint>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/Peer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index 3775c7c..2c9e2a0 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -19,9 +19,6 @@
 #define LIBMINIFI_INCLUDE_SITETOSITE_PEER_H_
 
 #include <stdio.h>
-#include <fcntl.h>
-#include <resolv.h>
-#include <netdb.h>
 #include <string>
 #include <errno.h>
 #include <uuid/uuid.h>
@@ -36,6 +33,7 @@
 #include "properties/Configure.h"
 #include "io/ClientSocket.h"
 #include "io/BaseStream.h"
+#include "io/DataStream.h"
 #include "utils/TimeUtil.h"
 #include "utils/HTTPClient.h"
 
@@ -47,11 +45,11 @@ namespace sitetosite {
 
 class Peer {
  public:
-  explicit Peer(uuid_t port_id, const std::string &host, uint16_t port, bool secure = false)
+  explicit Peer(utils::Identifier &port_id, const std::string &host, uint16_t port, bool secure = false)
       : host_(host),
         port_(port),
         secure_(secure) {
-    uuid_copy(port_id_, port_id);
+    port_id_ = port_id;
   }
 
   explicit Peer(const std::string &host, uint16_t port, bool secure = false)
@@ -64,14 +62,14 @@ class Peer {
       : host_(other.host_),
         port_(other.port_),
         secure_(other.secure_) {
-    uuid_copy(port_id_, other.port_id_);
+    port_id_ = other.port_id_;
   }
 
-  explicit Peer(const Peer &&other)
+  explicit Peer(Peer &&other)
       : host_(std::move(other.host_)),
         port_(std::move(other.port_)),
         secure_(std::move(other.secure_)) {
-    uuid_copy(port_id_, other.port_id_);
+    port_id_ = other.port_id_;
   }
 
   uint16_t getPort() const {
@@ -86,8 +84,8 @@ class Peer {
     return secure_;
   }
 
-  void getPortId(uuid_t other) const {
-    uuid_copy(other, port_id_);
+  void getPortId(utils::Identifier &other) const {
+    other = port_id_;
   }
 
  protected:
@@ -95,7 +93,7 @@ class Peer {
 
   uint16_t port_;
 
-  uuid_t port_id_;
+  utils::Identifier port_id_;
 
   // secore comms
 
@@ -149,12 +147,12 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
   /*
    * Create a new site2site peer
    */
-  explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host, uint16_t port, const std::string &interface)
-      : SiteToSitePeer(host, port, interface) {
+  explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host, uint16_t port, const std::string &ifc)
+      : SiteToSitePeer(host, port, ifc) {
     stream_ = std::move(injected_socket);
   }
 
-  explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &interface)
+  explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc)
       : stream_(nullptr),
         host_(host),
         port_(port),
@@ -164,7 +162,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
     url_ = "nifi://" + host_ + ":" + std::to_string(port_);
     yield_expiration_ = 0;
     timeout_ = 30000;  // 30 seconds
-    local_network_interface_= std::move(io::NetworkInterface(interface, nullptr));
+    local_network_interface_= std::move(io::NetworkInterface(ifc, nullptr));
   }
 
   explicit SiteToSitePeer(SiteToSitePeer &&ss)
@@ -191,8 +189,8 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
     return url_;
   }
   // setInterface
-  void setInterface(std::string &interface) {
-    local_network_interface_ = std::move(io::NetworkInterface(interface,nullptr));
+  void setInterface(std::string &ifc) {
+    local_network_interface_ = std::move(io::NetworkInterface(ifc,nullptr));
   }
   std::string getInterface() {
     return local_network_interface_.getInterface();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/RawSocketProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h b/libminifi/include/sitetosite/RawSocketProtocol.h
index 7a075bf..0c7feb5 100644
--- a/libminifi/include/sitetosite/RawSocketProtocol.h
+++ b/libminifi/include/sitetosite/RawSocketProtocol.h
@@ -21,13 +21,7 @@
 #define __SITE2SITE_CLIENT_PROTOCOL_H__
 
 #include <stdio.h>
-#include <unistd.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
 #include <string>
 #include <errno.h>
 #include <chrono>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSite.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index 484a277..2cc0823 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -245,8 +245,7 @@ class Transaction {
 
     // Generate the global UUID for the transaction
     id_generator_->generate(uuid_);
-    uuid_unparse_lower(uuid_, uuidStr);
-    uuid_str_ = uuidStr;
+    uuid_str_ = uuid_.to_string();
   }
   // Destructor
   virtual ~Transaction() {
@@ -262,7 +261,7 @@ class Transaction {
 
   void setUUIDStr(const std::string &str) {
     uuid_str_ = str;
-    uuid_parse(str.c_str(), uuid_);
+    uuid_ = str;
   }
 
   // getState
@@ -323,7 +322,7 @@ class Transaction {
   TransferDirection _direction;
 
   // A global unique identifier
-  uuid_t uuid_;
+  utils::Identifier uuid_;
   // UUID string
   std::string uuid_str_;
 
@@ -332,10 +331,10 @@ class Transaction {
 
 class SiteToSiteClientConfiguration {
  public:
-  SiteToSiteClientConfiguration(std::shared_ptr<io::StreamFactory> stream_factory, const std::shared_ptr<Peer> &peer, const std::string &interface, CLIENT_TYPE type = RAW)
+  SiteToSiteClientConfiguration(std::shared_ptr<io::StreamFactory> stream_factory, const std::shared_ptr<Peer> &peer, const std::string &ifc, CLIENT_TYPE type = RAW)
       : stream_factory_(stream_factory),
         peer_(peer),
-        local_network_interface_(interface),
+        local_network_interface_(ifc),
         ssl_service_(nullptr) {
     client_type_ = type;
   }
@@ -363,8 +362,8 @@ class SiteToSiteClientConfiguration {
   }
 
   // setInterface
-  void setInterface(std::string &interface) {
-    local_network_interface_ = interface;
+  void setInterface(std::string &ifc) {
+    local_network_interface_ = ifc;
   }
   std::string getInterface() const {
     return local_network_interface_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSiteClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 259b95e..6469e82 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -56,7 +56,7 @@ class SiteToSiteClient : public core::Connectable {
  public:
 
   SiteToSiteClient()
-      : core::Connectable("SitetoSiteClient", 0),
+      : core::Connectable("SitetoSiteClient"),
         peer_state_(IDLE),
         _batchSendNanos(5000000000),
         ssl_context_service_(nullptr),
@@ -96,11 +96,20 @@ class SiteToSiteClient : public core::Connectable {
    * @returns true if the process succeeded, failure OR exception thrown otherwise
    */
   virtual bool transfer(TransferDirection direction, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-    if (__builtin_expect(direction == SEND, 1)) {
+#ifndef WIN32
+	  if (__builtin_expect(direction == SEND, 1)) {
       return transferFlowFiles(context, session);
     } else {
       return receiveFlowFiles(context, session);
     }
+#else
+	  if (direction == SEND) {
+		  return transferFlowFiles(context, session);
+	  }
+	  else {
+		  return receiveFlowFiles(context, session);
+	  }
+#endif
   }
 
   /**
@@ -136,11 +145,9 @@ class SiteToSiteClient : public core::Connectable {
   virtual bool transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
                                std::map<std::string, std::string> attributes) = 0;
 
-  void setPortId(uuid_t id) {
-    uuid_copy(port_id_, id);
-    char idStr[37];
-    uuid_unparse_lower(id, idStr);
-    port_id_str_ = idStr;
+  void setPortId(utils::Identifier &id) {
+    port_id_ = id;
+    port_id_str_ = port_id_.to_string();
   }
 
   /**
@@ -241,7 +248,7 @@ class SiteToSiteClient : public core::Connectable {
   std::string port_id_str_;
 
   // portId
-  uuid_t port_id_;
+  utils::Identifier port_id_;
 
   // Peer Connection
   std::unique_ptr<SiteToSitePeer> peer_;
@@ -285,7 +292,7 @@ class WriteCallback : public OutputStreamCallback {
     int len = _packet->_size;
     int total = 0;
     while (len > 0) {
-      int size = std::min(len, 16384);
+	  int size = len < 16384 ? len : 16384; 
       int ret = _packet->transaction_->getStream().readData(buffer, size);
       if (ret != size) {
         logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 4f0f36c..2d1dd74 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -58,7 +58,7 @@ static std::unique_ptr<SiteToSitePeer> createStreamingPeer(const SiteToSiteClien
  * RawSiteToSiteClient will be instantiated and returned through a unique ptr.
  */
 static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientConfiguration &client_configuration) {
-  uuid_t uuid;
+  utils::Identifier uuid;
   client_configuration.getPeer()->getPortId(uuid);
   auto rsptr = createStreamingPeer(client_configuration);
   if (nullptr == rsptr){
@@ -77,7 +77,7 @@ static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientC
  * @returns site to site client or nullptr.
  */
 static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConfiguration &client_configuration) {
-  uuid_t uuid;
+  utils::Identifier uuid;
   client_configuration.getPeer()->getPortId(uuid);
   switch (client_configuration.getClientType()) {
     case RAW:
@@ -90,8 +90,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
         auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort(),
             client_configuration.getInterface()));
         peer->setHTTPProxy(client_configuration.getHTTPProxy());
-        char idStr[37];
-        uuid_unparse_lower(uuid, idStr);
+
         ptr->setPortId(uuid);
         ptr->setPeer(std::move(peer));
         return ptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 553dcea..87bf659 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -198,7 +198,7 @@ class HTTPRequestResponse {
 };
 
 class BaseHTTPClient {
- public:
+public:
   explicit BaseHTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) {
     response_code = -1;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/Id.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index d9f0811..46537be 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -17,10 +17,11 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_ID_H_
 #define LIBMINIFI_INCLUDE_UTILS_ID_H_
 
+#include <cstddef>
 #include <atomic>
 #include <memory>
 #include <string>
-#include <uuid/uuid.h>
+#include "uuid/uuid.h"
 
 #include "core/logging/Logger.h"
 #include "properties/Properties.h"
@@ -37,9 +38,91 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+template<typename T, typename C>
+class IdentifierBase {
+ public:
+
+  IdentifierBase(T myid) {
+    copyInto(myid);
+  }
+
+  IdentifierBase(const IdentifierBase &other) {
+    copyInto(other.id_);
+  }
+
+  IdentifierBase() {
+  }
+
+  IdentifierBase &operator=(const IdentifierBase &other) {
+    copyInto(other.id_);
+    return *this;
+  }
+
+  IdentifierBase &operator=(T o) {
+    copyInto(o);
+    return *this;
+  }
+
+  void getIdentifier(T other) const {
+    copyOutOf(other);
+  }
+
+  C convert() const {
+    return converted_;
+  }
+
+ protected:
+
+  void copyInto(const IdentifierBase &other){
+    memcpy(id_, other.id_, sizeof(T));
+  }
+
+  void copyInto(const void *other) {
+    memcpy(id_, other, sizeof(T));
+  }
+
+  void copyOutOf(void *other) {
+    memcpy(other, id_, sizeof(T));
+  }
+
+  C converted_;
+
+  T id_;
+};
+
+class Identifier : public IdentifierBase<UUID_FIELD, std::string> {
+ public:
+  Identifier(UUID_FIELD u);
+  Identifier();
+  Identifier(const Identifier &other);
+  Identifier(const IdentifierBase &other);
+
+  Identifier &operator=(const IdentifierBase &other);
+  Identifier &operator=(const Identifier &other);
+  Identifier &operator=(UUID_FIELD o);
+
+  Identifier &operator=(std::string id);
+  bool operator==(std::nullptr_t nullp);
+
+  bool operator!=(std::nullptr_t nullp);
+
+  bool operator!=(const Identifier &other);
+  bool operator==(const Identifier &other);
+
+  std::string to_string();
+
+  unsigned char *toArray();
+
+ protected:
+
+  void build_string();
+
+};
+
 class IdGenerator {
  public:
-  void generate(uuid_t output);
+  void generate(Identifier &output);
+  Identifier generate();
   void initialize(const std::shared_ptr<Properties> & properties);
 
   static std::shared_ptr<IdGenerator> getIdGenerator() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/StringUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 7f33260..24adae1 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -17,6 +17,11 @@
 #ifndef LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_
 #define LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_
 #include <iostream>
+#include <functional>
+#ifdef WIN32
+	#include <cwctype>
+	#include <cctype>
+#endif
 #include <algorithm>
 #include <sstream>
 #include <vector>
@@ -68,7 +73,7 @@ class StringUtils {
    * @returns modified string
    */
   static inline std::string trimLeft(std::string s) {
-    s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))));
+    s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(isspace))));
     return s;
   }
 
@@ -79,7 +84,7 @@ class StringUtils {
    */
 
   static inline std::string trimRight(std::string s) {
-    s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))).base(), s.end());
+    s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(isspace))).base(), s.end());
     return s;
   }
 
@@ -88,7 +93,7 @@ class StringUtils {
    */
   static inline bool equalsIgnoreCase(const std::string &left, const std::string right) {
     if (left.length() == right.length()) {
-      return std::equal(right.begin(), right.end(), left.begin(), [](unsigned char lc, unsigned char rc) {return std::tolower(lc) == std::tolower(rc);});
+      return std::equal(right.begin(), right.end(), left.begin(), [](unsigned char lc, unsigned char rc) {return tolower(lc) == tolower(rc);});
     } else {
       return false;
     }
@@ -200,7 +205,7 @@ class StringUtils {
   inline static bool endsWithIgnoreCase(const std::string &value, const std::string & endString) {
     if (endString.size() > value.size())
       return false;
-    return std::equal(endString.rbegin(), endString.rend(), value.rbegin(), [](unsigned char lc, unsigned char rc) {return std::tolower(lc) == std::tolower(rc);});
+    return std::equal(endString.rbegin(), endString.rend(), value.rbegin(), [](unsigned char lc, unsigned char rc) {return tolower(lc) == tolower(rc);});
   }
 
   inline static bool endsWith(const std::string &value, const std::string & endString) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/TimeUtil.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index 19c2566..0dc3b5c 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -18,9 +18,6 @@
 #define __TIME_UTIL_H__
 
 #include <time.h>
-#include <sys/time.h>
-#include <string.h>
-#include <unistd.h>
 #include <string.h>
 #include <iomanip>
 #include <sstream>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/DiffUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/DiffUtils.h b/libminifi/include/utils/file/DiffUtils.h
index d44360d..313e0f6 100644
--- a/libminifi/include/utils/file/DiffUtils.h
+++ b/libminifi/include/utils/file/DiffUtils.h
@@ -24,10 +24,8 @@
 #else
 #include <cstdlib>
 #include <sys/stat.h>
-#include <dirent.h>
 #endif
 #include <cstdio>
-#include <unistd.h>
 #include <fcntl.h>
 #ifdef WIN32
 #define stat _stat

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/FileManager.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/FileManager.h b/libminifi/include/utils/file/FileManager.h
index f6a4312..247f249 100644
--- a/libminifi/include/utils/file/FileManager.h
+++ b/libminifi/include/utils/file/FileManager.h
@@ -23,7 +23,6 @@
 #include <cstdlib>
 #endif
 #include <cstdio>
-#include <unistd.h>
 #include <fcntl.h>
 #include "io/validation.h"
 #include "utils/Id.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/FileUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 33ca437..dd307ee 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -17,6 +17,9 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_
 #define LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_
 
+
+
+
 #include <sstream>
 #include <fstream>
 #ifdef BOOST_VERSION
@@ -24,14 +27,30 @@
 #else
 #include <cstring>
 #include <cstdlib>
+#ifdef WIN32
+#define WIN32_LEAN_AND_MEAN
+#include <WinSock2.h>
+#include <WS2tcpip.h>
+#include <Windows.h>
+#pragma comment(lib, "Ws2_32.lib")
+#else
 #include <sys/stat.h>
 #include <dirent.h>
 #endif
+#endif
 #include <cstdio>
+#ifndef WIN32
 #include <unistd.h>
+#endif
 #include <fcntl.h>
 #ifdef WIN32
 #define stat _stat
+#include <direct.h>
+#include <windows.h> // winapi
+#include <sys/stat.h> // stat
+#include <tchar.h> // _tcscpy,_tcscat,_tcscmp
+#include <string> // string
+#include <algorithm> // replace
 #endif
 
 namespace org {
@@ -72,6 +91,50 @@ class FileUtils {
       //display error message
     }
     return 0;
+#elif defined(WIN32)
+	  WIN32_FIND_DATA FindFileData;
+	  HANDLE hFind;
+	  DWORD Attributes;
+	  std::string str;
+
+	  
+		std::stringstream pathstr;
+		pathstr << path << "\\.*";
+		str = pathstr.str();
+
+	  
+	  //List files
+	  hFind = FindFirstFile(str.c_str(), &FindFileData);
+	  if (hFind != INVALID_HANDLE_VALUE)
+	  {
+		  do {
+			  if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0)
+			  {
+				  //Str append Example
+
+				  std::stringstream strs;
+				  strs << path << "\\" << FindFileData.cFileName;
+				  str = strs.str();
+
+				  //_tprintf (TEXT("File Found: %s\n"),str);
+				  Attributes = GetFileAttributes(str.c_str());
+				  if (Attributes & FILE_ATTRIBUTE_DIRECTORY)
+				  {
+					  //is directory
+					  delete_dir(str, delete_files_recursively);
+				  }
+				  else
+				  {
+					  remove(str.c_str());
+					  //not directory
+				  }
+			  }
+		  } while (FindNextFile(hFind, &FindFileData));
+		  FindClose(hFind);
+
+		  RemoveDirectory(path.c_str());
+	  }
+	  return 0;
 #else
     DIR *current_directory = opendir(path.c_str());
     int r = -1;
@@ -136,11 +199,15 @@ class FileUtils {
 #else
     struct stat dir_stat;
     if (stat(path.c_str(), &dir_stat)) {
-      mkdir(path.c_str(), 0700);
+#ifdef WIN32
+      _mkdir(path.c_str());
+#else
+	mkdir(path.c_str(), 0700);
+#endif
+	return 0;
     }
-    return 0;
+	return -1;
 #endif
-    return -1;
   }
 
   static int copy_file(const std::string &path_from, const std::string dest_path) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/opsys/posix/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/opsys/posix/io/ClientSocket.h b/libminifi/opsys/posix/io/ClientSocket.h
new file mode 100644
index 0000000..4ed8b99
--- /dev/null
+++ b/libminifi/opsys/posix/io/ClientSocket.h
@@ -0,0 +1,296 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_
+
+
+#include <cstdint>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <mutex>
+#include <atomic>
+#include "io/BaseStream.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+#include "io/NetworkPrioritizer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Context class for socket. This is currently only used as a parent class for TLSContext.  It is necessary so the Socket and TLSSocket constructors
+ * can be the same.  It also gives us a common place to set timeouts, etc from the Configure object in the future.
+ */
+class SocketContext {
+ public:
+  SocketContext(const std::shared_ptr<Configure> &configure) {
+  }
+};
+/**
+ * Socket class.
+ * Purpose: Provides a general purpose socket interface that abstracts
+ * connecting information from users
+ * Design: Extends DataStream and allows us to perform most streaming
+ * operations against a BSD socket
+ *
+ *
+ */
+class Socket : public BaseStream {
+ public:
+  /**
+   * Constructor that creates a client socket.
+   * @param context the SocketContext
+   * @param hostname hostname we are connecting to.
+   * @param port port we are connecting to.
+   */
+  explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port);
+
+  /**
+   * Move constructor.
+   */
+  explicit Socket(const Socket &&);
+
+  /**
+   * Static function to return the current machine's host name
+   */
+  static std::string getMyHostName() {
+    static std::string HOSTNAME = init_hostname();
+    return HOSTNAME;
+  }
+
+  /**
+   * Destructor
+   */
+
+  virtual ~Socket();
+
+  virtual void closeStream();
+  /**
+   * Initializes the socket
+   * @return result of the creation operation.
+   */
+  virtual int16_t initialize();
+
+  virtual void setInterface(io::NetworkInterface &&interface) {
+    local_network_interface_ = std::move(interface);
+  }
+
+  /**
+   * Sets the non blocking flag on the file descriptor.
+   */
+  void setNonBlocking();
+
+  std::string getHostname() const;
+
+  /**
+   * Return the port for this socket
+   * @returns port
+   */
+  uint16_t getPort();
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+    return readData(buf, buflen, true);
+  }
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
+   */
+  virtual int readData(uint8_t *buf, int buflen) {
+    return readData(buf, buflen, true);
+  }
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   * @param retrieve_all_bytes determines if we should read all bytes before returning
+   */
+  virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Writes a system word
+   * @param value value to write
+   */
+  virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Writes a uint32_t
+   * @param value value to write
+   */
+  virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Writes a system short
+   * @param value value to write
+   */
+  virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a system word
+   * @param value value to write
+   */
+  virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a uint32_t
+   * @param value value to write
+   */
+  virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a system short
+   * @param value value to write
+   */
+  virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    return DataStream::getBuffer();
+  }
+
+  /**
+   * Retrieve size of data stream
+   * @return size of data stream
+   **/
+  const uint64_t getSize() const {
+    return DataStream::getSize();
+  }
+
+ protected:
+
+  /**
+   * Constructor that accepts host name, port and listeners. With this
+   * contructor we will be creating a server socket
+   * @param context the SocketContext
+   * @param hostname our host name
+   * @param port connecting port
+   * @param listeners number of listeners in the queue
+   */
+  explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+
+  /**
+   * Creates a connection using the address info object.
+   * @param p addrinfo structure.
+   * @returns fd.
+   */
+  virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr);
+
+  /**
+   * Sets socket options depending on the instance.
+   * @param sock socket file descriptor.
+   */
+  virtual int16_t setSocketOptions(const int sock);
+
+  /**
+   * Attempt to select the socket file descriptor
+   * @param msec timeout interval to wait
+   * @returns file descriptor
+   */
+  virtual int16_t select_descriptor(const uint16_t msec);
+
+  addrinfo *addr_info_;
+
+  std::recursive_mutex selection_mutex_;
+
+  std::string requested_hostname_;
+  std::string canonical_hostname_;
+  uint16_t port_;
+
+  bool is_loopback_only_;
+  io::NetworkInterface local_network_interface_;
+
+  // connection information
+  int32_t socket_file_descriptor_;
+
+  fd_set total_list_;
+  fd_set read_fds_;
+  std::atomic<uint16_t> socket_max_;
+  std::atomic<uint64_t> total_written_;
+  std::atomic<uint64_t> total_read_;
+  uint16_t listeners_;
+
+
+  bool nonBlocking_;
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  static std::string init_hostname() {
+    char hostname[1024];
+    gethostname(hostname, 1024);
+    Socket mySock(nullptr, hostname, 0);
+    mySock.initialize();
+    auto resolved_hostname = mySock.getHostname();
+    return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname;
+  }
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/opsys/win/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/opsys/win/io/ClientSocket.h b/libminifi/opsys/win/io/ClientSocket.h
new file mode 100644
index 0000000..3621215
--- /dev/null
+++ b/libminifi/opsys/win/io/ClientSocket.h
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_
+
+#include <cstdint>
+#include <sys/types.h>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+//#include <WinSock.h>
+//#include <WS2tcpip.h>
+#include <Windows.h>
+#include <WS2tcpip.h>
+#pragma comment(lib, "Ws2_32.lib")
+#else
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#endif
+#include <mutex>
+#include <atomic>
+#include "io/BaseStream.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+#include "io/NetworkPrioritizer.h"
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#ifdef WIN32
+	typedef SOCKET SocketDescriptor;
+#else
+	typedef int SocketDescriptor;
+	#define INVALID_SOCKET -1
+#endif
+
+
+/**
+ * Context class for socket. This is currently only used as a parent class for TLSContext.  It is necessary so the Socket and TLSSocket constructors
+ * can be the same.  It also gives us a common place to set timeouts, etc from the Configure object in the future.
+ */
+class SocketContext {
+ public:
+  SocketContext(const std::shared_ptr<Configure> &configure) {
+  }
+};
+/**
+ * Socket class.
+ * Purpose: Provides a general purpose socket interface that abstracts
+ * connecting information from users
+ * Design: Extends DataStream and allows us to perform most streaming
+ * operations against a BSD socket
+ *
+ *
+ */
+class Socket : public BaseStream {
+public:
+	/**
+	 * Constructor that creates a client socket.
+	 * @param context the SocketContext
+	 * @param hostname hostname we are connecting to.
+	 * @param port port we are connecting to.
+	 */
+	explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port);
+
+	/**
+	 * Move constructor.
+	 */
+	explicit Socket(const Socket &&);
+
+	/**
+	 * Static function to return the current machine's host name
+	 */
+	static std::string getMyHostName() {
+		static std::string HOSTNAME = init_hostname();
+		return HOSTNAME;
+	}
+
+	/**
+	 * Destructor
+	 */
+
+	virtual ~Socket();
+
+	virtual void closeStream();
+	/**
+	 * Initializes the socket
+	 * @return result of the creation operation.
+	 */
+	virtual int16_t initialize();
+
+	virtual void setInterface(io::NetworkInterface &&ifc) {
+		local_network_interface_ = std::move(ifc);
+	}
+
+	/**
+	 * Sets the non blocking flag on the file descriptor.
+	 */
+	void setNonBlocking();
+
+	std::string getHostname() const;
+
+	/**
+	 * Return the port for this socket
+	 * @returns port
+	 */
+	uint16_t getPort();
+
+	// data stream extensions
+	/**
+	 * Reads data and places it into buf
+	 * @param buf buffer in which we extract data
+	 * @param buflen
+	 * @param retrieve_all_bytes determines if we should read all bytes before returning
+	 */
+	virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+		return readData(buf, buflen, true);
+	}
+	/**
+	 * Reads data and places it into buf
+	 * @param buf buffer in which we extract data
+	 * @param buflen
+	 * @param retrieve_all_bytes determines if we should read all bytes before returning
+	 */
+	virtual int readData(uint8_t *buf, int buflen) {
+		return readData(buf, buflen, true);
+	}
+
+	/**
+	 * Reads data and places it into buf
+	 * @param buf buffer in which we extract data
+	 * @param buflen
+	 * @param retrieve_all_bytes determines if we should read all bytes before returning
+	 */
+	virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes);
+	/**
+	 * Reads data and places it into buf
+	 * @param buf buffer in which we extract data
+	 * @param buflen
+	 * @param retrieve_all_bytes determines if we should read all bytes before returning
+	 */
+	virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+
+	/**
+	 * Write value to the stream using std::vector
+	 * @param buf incoming buffer
+	 * @param buflen buffer to write
+	 *
+	 */
+	virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+	/**
+	 * writes value to stream
+	 * @param value value to write
+	 * @param size size of value
+	 */
+	virtual int writeData(uint8_t *value, int size);
+
+	/**
+	 * Writes a system word
+	 * @param value value to write
+	 */
+	virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+	/**
+	 * Writes a uint32_t
+	 * @param value value to write
+	 */
+	virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+	/**
+	 * Writes a system short
+	 * @param value value to write
+	 */
+	virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+	/**
+	 * Reads a system word
+	 * @param value value to write
+	 */
+	virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+	/**
+	 * Reads a uint32_t
+	 * @param value value to write
+	 */
+	virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+	/**
+	 * Reads a system short
+	 * @param value value to write
+	 */
+	virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+	/**
+	 * Returns the underlying buffer
+	 * @return vector's array
+	 **/
+	const uint8_t *getBuffer() const {
+		return DataStream::getBuffer();
+	}
+
+	/**
+	 * Retrieve size of data stream
+	 * @return size of data stream
+	 **/
+	const uint64_t getSize() const {
+		return DataStream::getSize();
+	}
+
+protected:
+
+	/**
+	 * Constructor that accepts host name, port and listeners. With this
+	 * contructor we will be creating a server socket
+	 * @param context the SocketContext
+	 * @param hostname our host name
+	 * @param port connecting port
+	 * @param listeners number of listeners in the queue
+	 */
+	explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+	/**
+	 * Creates a vector and returns the vector using the provided
+	 * type name.
+	 * @param t incoming object
+	 * @returns vector.
+	 */
+	template<typename T>
+	std::vector<uint8_t> readBuffer(const T&);
+
+	/**
+	 * Creates a connection using the address info object.
+	 * @param p addrinfo structure.
+	 * @returns fd.
+	 */
+#ifdef WIN32
+	virtual int8_t createConnection(const addrinfo *p, struct in_addr &addr);
+#else
+	virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr);
+#endif
+
+	/**
+	 * Sets socket options depending on the instance.
+	 * @param sock socket file descriptor.
+	 */
+	virtual int16_t setSocketOptions(const SocketDescriptor sock);
+
+	/**
+	 * Attempt to select the socket file descriptor
+	 * @param msec timeout interval to wait
+	 * @returns file descriptor
+	 */
+	virtual int16_t select_descriptor(const uint16_t msec);
+
+
+	addrinfo *addr_info_;
+
+	std::recursive_mutex selection_mutex_;
+
+	std::string requested_hostname_;
+	std::string canonical_hostname_;
+	uint16_t port_;
+
+	bool is_loopback_only_;
+	io::NetworkInterface local_network_interface_;
+
+	// connection information
+	SocketDescriptor socket_file_descriptor_;
+
+	fd_set total_list_;
+	fd_set read_fds_;
+	std::atomic<uint16_t> socket_max_;
+	std::atomic<uint64_t> total_written_;
+	std::atomic<uint64_t> total_read_;
+	uint16_t listeners_;
+
+	bool nonBlocking_;
+private:
+
+	class SocketInitializer
+	{
+	public:
+		SocketInitializer() {
+			#ifdef WIN32
+			static WSADATA s_wsaData;
+			int iWinSockInitResult = WSAStartup(MAKEWORD(2, 2), &s_wsaData);
+			#endif
+		}
+		~SocketInitializer() {
+		#ifdef WIN32
+			WSACleanup();
+		#endif
+		}
+	};
+	 static void initialize_socket() {
+		 static SocketInitializer initialized;
+		 }
+	 
+
+  std::shared_ptr<logging::Logger> logger_;
+
+
+  static std::string init_hostname() {
+    char hostname[1024];
+    gethostname(hostname, 1024);
+    Socket mySock(nullptr, hostname, 0);
+    mySock.initialize();
+    auto resolved_hostname = mySock.getHostname();
+    return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname;
+  }
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 9513f68..f9a8fcb 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -18,7 +18,6 @@
  * limitations under the License.
  */
 #include "Connection.h"
-#include <sys/time.h>
 #include <time.h>
 #include <vector>
 #include <queue>
@@ -39,17 +38,64 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid, uuid_t srcUUID,
-                       uuid_t destUUID)
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name)
+    : core::Connectable(name),
+      flow_repository_(flow_repository),
+      content_repo_(content_repo),
+      logger_(logging::LoggerFactory<Connection>::getLogger()) {
+  source_connectable_ = nullptr;
+  dest_connectable_ = nullptr;
+  max_queue_size_ = 0;
+  max_data_queue_size_ = 0;
+  expired_duration_ = 0;
+  queued_data_size_ = 0;
+
+  logger_->log_debug("Connection %s created", name_);
+}
+
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid)
+    : core::Connectable(name, uuid),
+      flow_repository_(flow_repository),
+      content_repo_(content_repo),
+      logger_(logging::LoggerFactory<Connection>::getLogger()) {
+  source_connectable_ = nullptr;
+  dest_connectable_ = nullptr;
+  max_queue_size_ = 0;
+  max_data_queue_size_ = 0;
+  expired_duration_ = 0;
+  queued_data_size_ = 0;
+
+  logger_->log_debug("Connection %s created", name_);
+}
+
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid,
+                       utils::Identifier & srcUUID)
+    : core::Connectable(name, uuid),
+      flow_repository_(flow_repository),
+      content_repo_(content_repo),
+      logger_(logging::LoggerFactory<Connection>::getLogger()) {
+
+  src_uuid_ = srcUUID;
+
+  source_connectable_ = nullptr;
+  dest_connectable_ = nullptr;
+  max_queue_size_ = 0;
+  max_data_queue_size_ = 0;
+  expired_duration_ = 0;
+  queued_data_size_ = 0;
+
+  logger_->log_debug("Connection %s created", name_);
+}
+
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid,
+                       utils::Identifier & srcUUID, utils::Identifier & destUUID)
     : core::Connectable(name, uuid),
       flow_repository_(flow_repository),
       content_repo_(content_repo),
       logger_(logging::LoggerFactory<Connection>::getLogger()) {
 
-  if (srcUUID)
-    uuid_copy(src_uuid_, srcUUID);
-  if (destUUID)
-    uuid_copy(dest_uuid_, destUUID);
+  src_uuid_ = srcUUID;
+  dest_uuid_ = destUUID;
 
   source_connectable_ = nullptr;
   dest_connectable_ = nullptr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 60122b0..60f701a 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -18,10 +18,8 @@
  * limitations under the License.
  */
 #include "FlowControlProtocol.h"
-#include <sys/time.h>
 #include <stdio.h>
 #include <time.h>
-#include <netinet/tcp.h>
 #include <chrono>
 #include <thread>
 #include <string>
@@ -35,96 +33,12 @@ namespace nifi {
 namespace minifi {
 
 int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
-  in_addr_t addr;
-  int sock = 0;
-  struct hostent *h;
-#ifdef __MACH__
-  h = gethostbyname(host);
-#else
-  char buf[1024];
-  struct hostent he;
-  int hh_errno;
-  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-  memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
-  sock = socket(AF_INET, SOCK_STREAM, 0);
-  if (sock < 0) {
-    logger_->log_error("Could not create socket to hostName %s", host);
-    return 0;
-  }
-
-#ifndef __MACH__
-  int opt = 1;
-  bool nagle_off = true;
-
-  if (nagle_off) {
-    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, reinterpret_cast<void*>(&opt), sizeof(opt)) < 0) {
-      logger_->log_error("setsockopt() TCP_NODELAY failed");
-      close(sock);
-      return 0;
-    }
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
-      logger_->log_error("setsockopt() SO_REUSEADDR failed");
-      close(sock);
-      return 0;
-    }
-  }
-
-  int sndsize = 256 * 1024;
-  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) {
-    logger_->log_error("setsockopt() SO_SNDBUF failed");
-    close(sock);
-    return 0;
-  }
-#endif
-
-  struct sockaddr_in sa;
-  socklen_t socklen;
-  int status;
-
-  memset(&sa, 0, sizeof(sa));
-  sa.sin_family = AF_INET;
-  sa.sin_addr.s_addr = htonl(INADDR_ANY);
-  sa.sin_port = htons(0);
-  socklen = sizeof(sa);
-  if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) {
-    logger_->log_error("socket bind failed");
-    close(sock);
-    return 0;
-  }
-
-  memset(&sa, 0, sizeof(sa));
-  sa.sin_family = AF_INET;
-  sa.sin_addr.s_addr = addr;
-  sa.sin_port = htons(port);
-  socklen = sizeof(sa);
-
-  status = connect(sock, (struct sockaddr *) &sa, socklen);
-
-  if (status < 0) {
-    logger_->log_error("socket connect failed to %s %ll", host, port);
-    close(sock);
-    return 0;
-  }
-
-  logger_->log_debug("Flow Control Protocol socket %ll connect to server %s port %ll success", sock, host, port);
-
-  return sock;
+  in_addr addr;
+  return 0;
 }
 
 int FlowControlProtocol::sendData(uint8_t *buf, int buflen) {
-  int ret = 0, bytes = 0;
-
-  while (bytes < buflen) {
-    ret = send(_socket, buf + bytes, buflen - bytes, 0);
-    // check for errors
-    if (ret == -1) {
-      return ret;
-    }
-    bytes += ret;
-  }
-
-  return bytes;
+  return 0;
 }
 
 int FlowControlProtocol::selectClient(int msec) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 12ce99f..c840f14 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -18,11 +18,9 @@
  * limitations under the License.
  */
 #include "FlowController.h"
-#include <sys/time.h>
 #include <time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
-#include <unistd.h>
 #include <vector>
 #include <queue>
 #include <map>
@@ -56,6 +54,13 @@
 #include "core/Connectable.h"
 #include "utils/HTTPClient.h"
 
+
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -63,7 +68,7 @@ namespace minifi {
 
 std::shared_ptr<utils::IdGenerator> FlowController::id_generator_ = utils::IdGenerator::getIdGenerator();
 
-#define DEFAULT_CONFIG_NAME "conf/flow.yml"
+#define DEFAULT_CONFIG_NAME "conf/config.yml"
 
 FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
                                std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode)
@@ -133,8 +138,12 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
 
 void FlowController::initializePaths(const std::string &adjustedFilename) {
   char *path = NULL;
+#ifndef WIN32
   char full_path[PATH_MAX];
   path = realpath(adjustedFilename.c_str(), full_path);
+#else
+  path = const_cast<char*>(adjustedFilename.c_str());
+#endif
 
   if (path == NULL) {
     throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
@@ -144,7 +153,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
   logger_->log_info("FlowController NiFi Configuration file %s", pathString);
 
   if (!path) {
-    logger_->log_error("Could not locate path from provided configuration file name (%s).  Exiting.", full_path);
+    logger_->log_error("Could not locate path from provided configuration file name (%s).  Exiting.", path);
     exit(1);
   }
 }
@@ -540,7 +549,7 @@ void FlowController::loadC2ResponseConfiguration(const std::string &prefix) {
         std::string name;
 
         if (configuration_->get(nameOption.str(), name)) {
-          std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr);
+          std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
 
           if (configuration_->get(classOption.str(), class_definitions)) {
             std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
@@ -601,7 +610,7 @@ std::shared_ptr<state::response::ResponseNode> FlowController::loadC2ResponseCon
         std::string name;
 
         if (configuration_->get(nameOption.str(), name)) {
-          std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr);
+          std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
           if (name.find(",") != std::string::npos) {
             std::vector<std::string> sub_classes = utils::StringUtils::split(name, ",");
             for (std::string subClassStr : classes) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 7815e70..8868c9a 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -18,7 +18,6 @@
  * limitations under the License.
  */
 #include "FlowFileRecord.h"
-#include <sys/time.h>
 #include <time.h>
 #include <cstdio>
 #include <vector>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index cb3b752..f5537fc 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -104,8 +104,12 @@ void Properties::loadConfigureFile(const char *fileName) {
     }
   }
   char *path = NULL;
+#ifndef WIN32
   char full_path[PATH_MAX];
   path = realpath(adjustedFilename.c_str(), full_path);
+#else
+  path = const_cast<char*>(adjustedFilename.c_str());
+#endif
   logger_->log_info("Using configuration file located at %s", path);
 
   std::ifstream file(path, std::ifstream::in);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index e52a2c2..8bed630 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -20,8 +20,6 @@
 
 #include "RemoteProcessorGroupPort.h"
 
-#include <curl/curl.h>
-#include <curl/easy.h>
 #include <uuid/uuid.h>
 #include <algorithm>
 #include <cstdint>
@@ -133,8 +131,8 @@ void RemoteProcessorGroupPort::initialize() {
 
 void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   std::string value;
-  if (context->getProperty(portUUID.getName(), value)) {
-    uuid_parse(value.c_str(), protocol_uuid_);
+  if (context->getProperty(portUUID.getName(), value) && !value.empty()) {
+    protocol_uuid_ = value;
   }
 
   std::string context_name;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 2514906..1ef8eea 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -17,7 +17,6 @@
  */
 
 #include "c2/C2Agent.h"
-#include <unistd.h>
 #include <csignal>
 #include <utility>
 #include <vector>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 391ed81..d1e8b9a 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -33,6 +33,7 @@ namespace minifi {
 namespace c2 {
 
 const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
+#ifndef WIN32
   rapidjson::Document root;
 
   try {
@@ -126,6 +127,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const
     }
   } catch (...) {
   }
+#endif
   return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/capi/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp
index d8da5d6..dd04466 100644
--- a/libminifi/src/capi/C2CallbackAgent.cpp
+++ b/libminifi/src/capi/C2CallbackAgent.cpp
@@ -17,7 +17,6 @@
  */
 
 #include "capi/C2CallbackAgent.h"
-#include <unistd.h>
 #include <csignal>
 #include <utility>
 #include <vector>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 6181382..595a01a 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -22,6 +22,8 @@
 #include <set>
 #include <string>
 
+std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
+
 ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
     : content_repo_(content_repo),
       flow_repo_(flow_repo),
@@ -38,8 +40,8 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p
     return nullptr;
   }
 
-  uuid_t uuid;
-  uuid_generate(uuid);
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
 
   processor->setStreamFactory(stream_factory);
   // initialize the processor
@@ -66,7 +68,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p
     connection->setSource(last);
     connection->setDestination(processor);
 
-    uuid_t uuid_copy, uuid_copy_next;
+    utils::Identifier uuid_copy, uuid_copy_next;
     last->getUUID(uuid_copy);
     connection->setSourceUUID(uuid_copy);
     processor->getUUID(uuid_copy_next);
@@ -95,8 +97,8 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &
     return nullptr;
   }
 
-  uuid_t uuid;
-  uuid_generate(uuid);
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
 
   auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
   if (nullptr == ptr) {
@@ -186,7 +188,7 @@ std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::sha
   if (setDest)
     connection->setDestination(processor);
 
-  uuid_t uuid_copy;
+  utils::Identifier uuid_copy;
   last->getUUID(uuid_copy);
   connection->setSourceUUID(uuid_copy);
   if (setDest)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/LinuxPowerManagementService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/LinuxPowerManagementService.cpp b/libminifi/src/controllers/LinuxPowerManagementService.cpp
index 836c9d3..a54cb7b 100644
--- a/libminifi/src/controllers/LinuxPowerManagementService.cpp
+++ b/libminifi/src/controllers/LinuxPowerManagementService.cpp
@@ -40,7 +40,7 @@ bool LinuxPowerManagerService::isAboveMax(int new_tasks) {
 }
 
 uint16_t LinuxPowerManagerService::getMaxThreads() {
-  return std::numeric_limits<uint16_t>::max();
+  return (std::numeric_limits<uint16_t>::max)();
 }
 
 bool LinuxPowerManagerService::canIncrease() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/NetworkPrioritizerService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/NetworkPrioritizerService.cpp b/libminifi/src/controllers/NetworkPrioritizerService.cpp
index 63210ec..d92758a 100644
--- a/libminifi/src/controllers/NetworkPrioritizerService.cpp
+++ b/libminifi/src/controllers/NetworkPrioritizerService.cpp
@@ -21,15 +21,18 @@
 #include <limits>
 #include <string>
 #include <vector>
-#include <sys/ioctl.h>
+#ifndef WIN32
 #include <ifaddrs.h>
 #include <net/if.h>
+#include <sys/ioctl.h>
 #include <netinet/in.h>
-#include <string.h>
 #include <sys/socket.h>
 #include <netdb.h>
-#include <stdlib.h>
 #include <unistd.h>
+#endif
+#include <string.h>
+#include <stdlib.h>
+
 #include <set>
 #include "utils/StringUtils.h"
 #if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD))
@@ -112,6 +115,7 @@ std::string NetworkPrioritizerService::get_nearest_interface(const std::vector<s
 }
 
 bool NetworkPrioritizerService::interface_online(const std::string &ifc) {
+#ifndef WIN32
   struct ifreq ifr;
   auto sockid = socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP);
   memset(&ifr, 0, sizeof(ifr));
@@ -121,6 +125,9 @@ bool NetworkPrioritizerService::interface_online(const std::string &ifc) {
   }
   close(sockid);
   return (ifr.ifr_flags & IFF_UP) && (ifr.ifr_flags & IFF_RUNNING);
+#else
+  return false;
+#endif
 }
 
 std::vector<std::string> NetworkPrioritizerService::getInterfaces(uint32_t size = 0) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 352ba37..8d0a997 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -17,8 +17,11 @@
  */
 
 #include "controllers/SSLContextService.h"
+
+#ifdef OPENSSL_SUPPORT
 #include <openssl/err.h>
 #include <openssl/ssl.h>
+#endif
 #include <string>
 #include <memory>
 #include <set>
@@ -44,7 +47,13 @@ void SSLContextService::initialize() {
   initialized_ = true;
 }
 
+/**
+ * If OpenSSL is not installed we may still continue operations. Nullptr will
+ * be returned and it will be up to the caller to determien if this failure is
+ * recoverable.
+ */
 std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
+#ifdef OPENSSL_SUPPORT
   SSL_library_init();
   const SSL_METHOD *method;
 
@@ -82,6 +91,9 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
     logger_->log_error("Can not load CA certificate %s, Exiting, error : %s", ca_certificate_, std::strerror(errno));
   }
   return std::unique_ptr<SSLContext>(new SSLContext(ctx));
+#else
+  return nullptr;
+#endif
 }
 
 const std::string &SSLContextService::getCertificateFile() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/UpdatePolicyControllerService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/UpdatePolicyControllerService.cpp b/libminifi/src/controllers/UpdatePolicyControllerService.cpp
index 08faad2..ae56fe1 100644
--- a/libminifi/src/controllers/UpdatePolicyControllerService.cpp
+++ b/libminifi/src/controllers/UpdatePolicyControllerService.cpp
@@ -21,15 +21,8 @@
 #include <limits>
 #include <string>
 #include <vector>
-#include <sys/ioctl.h>
-#include <ifaddrs.h>
-#include <net/if.h>
-#include <netinet/in.h>
 #include <string.h>
-#include <sys/socket.h>
-#include <netdb.h>
 #include <stdlib.h>
-#include <unistd.h>
 #include <set>
 #include "utils/StringUtils.h"
 #if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD))

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
index 5c2fdf8..9c1aa67 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-#include <sys/mman.h>
 #include <memory>
 #include <string>
-
 #include "core/ClassLoader.h"
 
 namespace org {
@@ -35,6 +33,7 @@ ClassLoader &ClassLoader::getDefaultClassLoader() {
   // populate ret
   return ret;
 }
+
 uint16_t ClassLoader::registerResource(const std::string &resource, const std::string &resourceFunction) {
   void *resource_ptr = nullptr;
   if (resource.empty()) {
@@ -57,7 +56,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource, const std::s
   // load the symbols
   createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(resource_ptr, resourceFunction.c_str()));
   const char* dlsym_error = dlerror();
-  if (dlsym_error) {
+  if ((dlsym_error != nullptr && strlen(dlsym_error) > 0) || create_factory_func == nullptr) {
     return RESOURCE_FAILURE;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index ff6d1e5..c413ab9 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -191,6 +191,7 @@ bool ConfigurableComponent::createDynamicProperty(const std::string &name, const
   }
 
   Property new_property(name, DEFAULT_DYNAMIC_PROPERTY_DESC, value, false, "", {}, {});
+  new_property.setSupportsExpressionLanguage(true);
   logger_->log_info("Processor %s dynamic property '%s' value '%s'", name.c_str(), new_property.getName().c_str(), value.c_str());
   dynamic_properties_[new_property.getName()] = new_property;
   onDynamicPropertyModified({}, new_property);
@@ -205,6 +206,7 @@ bool ConfigurableComponent::setDynamicProperty(const std::string name, std::stri
     Property &orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
+    new_property.setSupportsExpressionLanguage(true);
     dynamic_properties_[new_property.getName()] = new_property;
     onDynamicPropertyModified(orig_property, new_property);
     logger_->log_debug("Component %s dynamic property name %s value %s", name, new_property.getName(), value);
@@ -222,6 +224,7 @@ bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const
     Property &orig_property = it->second;
     Property new_property = orig_property;
     new_property.addValue(value);
+    new_property.setSupportsExpressionLanguage(true);
     dynamic_properties_[new_property.getName()] = new_property;
     onDynamicPropertyModified(orig_property, new_property);
     logger_->log_debug("Component %s dynamic property name %s value %s", name, new_property.getName(), value);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index 2a98818..a5fb1b2 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -30,13 +30,21 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-Connectable::Connectable(std::string name, uuid_t uuid)
+Connectable::Connectable(std::string name, utils::Identifier &uuid)
     : CoreComponent(name, uuid),
       max_concurrent_tasks_(1),
       connectable_version_(nullptr),
       logger_(logging::LoggerFactory<Connectable>::getLogger()) {
 }
 
+Connectable::Connectable(std::string name)
+    : CoreComponent(name),
+      max_concurrent_tasks_(1),
+      connectable_version_(nullptr),
+      logger_(logging::LoggerFactory<Connectable>::getLogger()) {
+}
+
+
 Connectable::Connectable(const Connectable &&other)
     : CoreComponent(std::move(other)),
       max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)),

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index da9462b..e5c8d6c 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -29,31 +29,29 @@ namespace core {
 std::shared_ptr<utils::IdGenerator> CoreComponent::id_generator_ = utils::IdGenerator::getIdGenerator();
 
 // Set UUID
-void CoreComponent::setUUID(uuid_t uuid) {
-  uuid_copy(uuid_, uuid);
-  char uuidStr[37];
-  uuid_unparse_lower(uuid_, uuidStr);
-  uuidStr_ = uuidStr;
+void CoreComponent::setUUID(utils::Identifier &uuid) {
+  uuid_ = uuid;
+  uuidStr_ = uuid_.to_string();
 }
 
 void CoreComponent::setUUIDStr(const std::string uuidStr) {
-  uuid_parse(uuidStr.c_str(), uuid_);
+  uuid_ = uuidStr;
   uuidStr_ = uuidStr;
 }
 // Get UUID
-bool CoreComponent::getUUID(uuid_t uuid) {
-  if (uuid) {
-    uuid_copy(uuid, uuid_);
-    return true;
-  } else {
+bool CoreComponent::getUUID(utils::Identifier &uuid) {
+  if (uuid_ == nullptr) {
     return false;
   }
+  uuid = uuid_;
+  return true;
 }
 
 // Get UUID
-unsigned const char *CoreComponent::getUUID() {
-  return uuid_;
-}
+/*
+ unsigned const char *CoreComponent::getUUID() {
+ return uuid_.getIdentifier();
+ }*/
 
 // Set Processor Name
 void CoreComponent::setName(const std::string name) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 320797b..aea6d62 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -28,13 +28,15 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-std::vector<std::string> FlowConfiguration::statics_sl_funcs_;
-std::mutex FlowConfiguration::atomic_initialization_;
+static_initializers &get_static_functions() {
+  static static_initializers static_sl_funcs;
+  return static_sl_funcs;
+}
 
 FlowConfiguration::~FlowConfiguration() {
 }
 
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) {
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier &  uuid) {
   auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid);
   if (nullptr == ptr) {
     logger_->log_error("No Processor defined for %s", name);
@@ -60,47 +62,47 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask()
 }
 
 std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const std::string &source, const std::string &yamlConfigPayload) {
-    if (!source.empty()) {
-      std::string host, protocol, path, query, url = source;
-      int port;
-      utils::parse_url(&url, &host, &port, &protocol, &path, &query);
-
-      std::string flow_id, bucket_id;
-      auto path_split = utils::StringUtils::split(path, "/");
-      for (size_t i = 0; i < path_split.size(); i++) {
-        const std::string &str = path_split.at(i);
-        if (str == "flows") {
-          if (i + 1 < path_split.size()) {
-            flow_id = path_split.at(i + 1);
-            i++;
-          }
+  if (!source.empty()) {
+    std::string host, protocol, path, query, url = source;
+    int port;
+    utils::parse_url(&url, &host, &port, &protocol, &path, &query);
+
+    std::string flow_id, bucket_id;
+    auto path_split = utils::StringUtils::split(path, "/");
+    for (size_t i = 0; i < path_split.size(); i++) {
+      const std::string &str = path_split.at(i);
+      if (str == "flows") {
+        if (i + 1 < path_split.size()) {
+          flow_id = path_split.at(i + 1);
+          i++;
         }
+      }
 
-        if (str == "bucket") {
-          if (i + 1 < path_split.size()) {
-            bucket_id = path_split.at(i + 1);
-            i++;
-          }
+      if (str == "bucket") {
+        if (i + 1 < path_split.size()) {
+          bucket_id = path_split.at(i + 1);
+          i++;
         }
       }
-      flow_version_->setFlowVersion(url, bucket_id, flow_id);
     }
-    return getRootFromPayload(yamlConfigPayload);
+    flow_version_->setFlowVersion(url, bucket_id, flow_id);
   }
+  return getRootFromPayload(yamlConfigPayload);
+}
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) {
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, utils::Identifier &  uuid, int version) {
   return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
 }
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, utils::Identifier &  uuid) {
   return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
 }
 
-std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) {
+std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, utils::Identifier &  uuid) {
   return std::make_shared<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid);
 }
 
-std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) {
+std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, utils::Identifier &  uuid) {
   std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, true);
   if (nullptr != controllerServicesNode)
     controllerServicesNode->setUUID(uuid);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 3427559..d41d9b0 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -33,7 +33,7 @@ std::shared_ptr<utils::IdGenerator> FlowFile::id_generator_ = utils::IdGenerator
 std::shared_ptr<logging::Logger> FlowFile::logger_ = logging::LoggerFactory<FlowFile>::getLogger();
 
 FlowFile::FlowFile()
-    : Connectable("FlowFile", 0),
+    : Connectable("FlowFile"),
       size_(0),
       id_(0),
       stored(false),
@@ -54,7 +54,7 @@ FlowFile::~FlowFile() {
 }
 
 FlowFile& FlowFile::operator=(const FlowFile& other) {
-  uuid_copy(uuid_, other.uuid_);
+  uuid_ = other.uuid_;
   stored = other.stored;
   marked_delete_ = other.marked_delete_;
   entry_date_ = other.entry_date_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 205f9f2..7df1b09 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -18,7 +18,6 @@
  * limitations under the License.
  */
 #include "core/ProcessGroup.h"
-#include <sys/time.h>
 #include <time.h>
 #include <vector>
 #include <memory>
@@ -39,17 +38,40 @@ namespace core {
 
 std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
 
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid)
+    : ProcessGroup(type, name, uuid, 0, 0) {
+}
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version)
+    : ProcessGroup(type, name, uuid, version, 0) {
+}
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version, ProcessGroup *parent)
     : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
       name_(name),
       type_(type),
       config_version_(version),
       parent_process_group_(parent) {
-  if (!uuid)
-    // Generate the global UUID for the flow record
+  if (uuid == nullptr) {
     id_generator_->generate(uuid_);
-  else
-    uuid_copy(uuid_, uuid);
+  } else {
+    uuid_ = uuid;
+  }
+
+  yield_period_msec_ = 0;
+  transmitting_ = false;
+  transport_protocol_ = "RAW";
+
+  logger_->log_debug("ProcessGroup %s created", name_);
+}
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name)
+    : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
+      name_(name),
+      type_(type),
+      config_version_(0),
+      parent_process_group_(0) {
+  id_generator_->generate(uuid_);
 
   yield_period_msec_ = 0;
   transmitting_ = false;
@@ -168,20 +190,15 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve
   }
 }
 
-std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
+std::shared_ptr<Processor> ProcessGroup::findProcessor(utils::Identifier &uuid) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_debug("find processor %s", processor->getName());
-    uuid_t processorUUID;
+    utils::Identifier processorUUID;
 
     if (processor->getUUID(processorUUID)) {
-      char uuid_str[37];  // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
-      uuid_unparse_lower(processorUUID, uuid_str);
-      std::string processorUUIDstr = uuid_str;
-      uuid_unparse_lower(uuid, uuid_str);
-      std::string uuidStr = uuid_str;
-      if (processorUUIDstr == uuidStr) {
+      if (uuid == processorUUID) {
         return processor;
       }
     }
@@ -277,14 +294,14 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
     // We do not have the same connection in this process group yet
     connections_.insert(connection);
     logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
-    uuid_t sourceUUID;
+    utils::Identifier sourceUUID;
     std::shared_ptr<Processor> source = NULL;
     connection->getSourceUUID(sourceUUID);
     source = this->findProcessor(sourceUUID);
     if (source)
       source->addConnection(connection);
     std::shared_ptr<Processor> destination = NULL;
-    uuid_t destinationUUID;
+    utils::Identifier destinationUUID;
     connection->getDestinationUUID(destinationUUID);
     destination = this->findProcessor(destinationUUID);
     if (destination && destination != source)
@@ -299,14 +316,14 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
     // We do not have the same connection in this process group yet
     connections_.erase(connection);
     logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_);
-    uuid_t sourceUUID;
+    utils::Identifier sourceUUID;
     std::shared_ptr<Processor> source = NULL;
     connection->getSourceUUID(sourceUUID);
     source = this->findProcessor(sourceUUID);
     if (source)
       source->removeConnection(connection);
     std::shared_ptr<Processor> destination = NULL;
-    uuid_t destinationUUID;
+    utils::Identifier destinationUUID;
     connection->getDestinationUUID(destinationUUID);
     destination = this->findProcessor(destinationUUID);
     if (destination && destination != source)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1bfa9f8..dc45446 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -19,7 +19,6 @@
  */
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionReadCallback.h"
-#include <sys/time.h>
 #include <time.h>
 #include <vector>
 #include <queue>
@@ -31,6 +30,25 @@
 #include <thread>
 #include <iostream>
 #include <uuid/uuid.h>
+/* This implementation is only for native Windows systems.  */
+#if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__
+#define _WINSOCKAPI_
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <WinSock2.h>
+#include <WS2tcpip.h>
+#include <Windows.h>
+#pragma comment(lib, "Ws2_32.lib")
+#include <direct.h>
+
+int getpagesize(void) {
+  return 4096;
+  // SYSTEM_INFO system_info;
+  // GetSystemInfo(&system_info);
+  // return system_info.dwPageSize;
+}
+#endif
 
 namespace org {
 namespace apache {
@@ -619,12 +637,10 @@ bool ProcessSession::exportContent(const std::string &destination, const std::st
 }
 
 bool ProcessSession::exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) {
-  char tmpFileUuidStr[37];
-  uuid_t tmpFileUuid;
+  utils::Identifier tmpFileUuid;
   id_generator_->generate(tmpFileUuid);
-  uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr);
   std::stringstream tmpFileSs;
-  tmpFileSs << destination << "." << tmpFileUuidStr;
+  tmpFileSs << destination << "." << tmpFileUuid.to_string();
   std::string tmpFileName = tmpFileSs.str();
 
   return exportContent(destination, tmpFileName, flow, keepContent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7c0fe25..480ae58 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -18,7 +18,6 @@
  * limitations under the License.
  */
 #include "core/Processor.h"
-#include <sys/time.h>
 #include <time.h>
 #include <vector>
 #include <queue>
@@ -45,8 +44,8 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-Processor::Processor(std::string name, uuid_t uuid)
-    : Connectable(name, uuid),
+Processor::Processor(std::string name)
+    : Connectable(name),
       ConfigurableComponent(),
       logger_(logging::LoggerFactory<Processor>::getLogger()) {
   has_work_.store(false);
@@ -66,6 +65,27 @@ Processor::Processor(std::string name, uuid_t uuid)
   logger_->log_debug("Processor %s created UUID %s", name_, uuidStr_);
 }
 
+Processor::Processor(std::string name, utils::Identifier &uuid)
+    : Connectable(name, uuid),
+      ConfigurableComponent(),
+      logger_(logging::LoggerFactory<Processor>::getLogger()) {
+  has_work_.store(false);
+  // Setup the default values
+  state_ = DISABLED;
+  strategy_ = TIMER_DRIVEN;
+  loss_tolerant_ = false;
+  _triggerWhenEmpty = false;
+  scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
+  run_duration_nano_ = DEFAULT_RUN_DURATION;
+  yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+  _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+  max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
+  active_tasks_ = 0;
+  yield_expiration_ = 0;
+  incoming_connections_Iter = this->_incomingConnections.begin();
+  logger_->log_debug("Processor %s created UUID %s with uuid %s", name_, uuidStr_, uuid.to_string());
+}
+
 bool Processor::isRunning() {
   return (state_ == RUNNING && active_tasks_ > 0);
 }
@@ -87,17 +107,13 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
   std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
   std::lock_guard<std::mutex> lock(mutex_);
 
-  uuid_t srcUUID;
-  uuid_t destUUID;
+  utils::Identifier srcUUID;
+  utils::Identifier destUUID;
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
-  char uuid_str[37];
-
-  uuid_unparse_lower(uuid_, uuid_str);
-  std::string my_uuid = uuid_str;
-  uuid_unparse_lower(destUUID, uuid_str);
-  std::string destination_uuid = uuid_str;
+  std::string my_uuid = uuid_.to_string();
+  std::string destination_uuid = destUUID.to_string();
   if (my_uuid == destination_uuid) {
     // Connection is destination to the current processor
     if (_incomingConnections.find(connection) == _incomingConnections.end()) {
@@ -108,8 +124,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
       ret = true;
     }
   }
-  uuid_unparse_lower(srcUUID, uuid_str);
-  std::string source_uuid = uuid_str;
+  std::string source_uuid = srcUUID.to_string();
   if (my_uuid == source_uuid) {
     std::string relationship = connection->getRelationship().getName();
     // Connection is source from the current processor
@@ -147,15 +162,15 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
 
   std::lock_guard<std::mutex> lock(mutex_);
 
-  uuid_t srcUUID;
-  uuid_t destUUID;
+  utils::Identifier srcUUID;
+  utils::Identifier destUUID;
 
   std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
 
   connection->getSourceUUID(srcUUID);
   connection->getDestinationUUID(destUUID);
 
-  if (uuid_compare(uuid_, destUUID) == 0) {
+  if (uuid_ == destUUID) {
     // Connection is destination to the current processor
     if (_incomingConnections.find(connection) != _incomingConnections.end()) {
       _incomingConnections.erase(connection);
@@ -165,7 +180,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
     }
   }
 
-  if (uuid_compare(uuid_, srcUUID) == 0) {
+  if (uuid_ == srcUUID) {
     std::string relationship = connection->getRelationship().getName();
     // Connection is source from the current processor
     auto &&it = out_going_connections_.find(relationship);