You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:16 UTC

[11/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/DataStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h
index 2f793ff..e37dbf7 100644
--- a/libminifi/include/io/DataStream.h
+++ b/libminifi/include/io/DataStream.h
@@ -22,99 +22,111 @@
 #include <cstdint>
 #include <vector>
 #include "EndianCheck.h"
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 /**
  * DataStream defines the mechanism through which
  * binary data will be written to a sink
  */
 class DataStream {
-public:
-
-	DataStream() :
-			readBuffer(0) {
-
-	}
-
-	/**
-	 * Constructor
-	 **/
-	explicit DataStream(const uint8_t *buf, const uint32_t buflen) :
-			DataStream() {
-		writeData((uint8_t*) buf, buflen);
-
-	}
-
-	virtual short initialize() {
-		buffer.clear();
-		readBuffer = 0;
-		return 0;
-	}
-
-	virtual void closeStream()
-	{
-
-	}
-	/**
-	 * Reads data and places it into buf
-	 * @param buf buffer in which we extract data
-	 * @param buflen
-	 */
-	virtual int readData(std::vector<uint8_t> &buf, int buflen);
-	/**
-	 * Reads data and places it into buf
-	 * @param buf buffer in which we extract data
-	 * @param buflen
-	 */
-	virtual int readData(uint8_t *buf, int buflen);
-
-	/**
-	 * writes valiue to buffer
-	 * @param value value to write
-	 * @param size size of value
-	 */
-	virtual int writeData(uint8_t *value, int size);
-
-	/**
-	 * Reads a system word
-	 * @param value value to write
-	 */
-	virtual int read(uint64_t &value, bool is_little_endian =
-			EndiannessCheck::IS_LITTLE);
-
-	/**
-	 * Reads a uint32_t
-	 * @param value value to write
-	 */
-	virtual int read(uint32_t &value, bool is_little_endian =
-			EndiannessCheck::IS_LITTLE);
-
-	/**
-	 * Reads a system short
-	 * @param value value to write
-	 */
-	virtual int read(uint16_t &value, bool is_little_endian =
-			EndiannessCheck::IS_LITTLE);
-
-	/**
-	 * Returns the underlying buffer
-	 * @return vector's array
-	 **/
-	const uint8_t *getBuffer() const {
-		return &buffer[0];
-	}
-
-	/**
-	 * Retrieve size of data stream
-	 * @return size of data stream
-	 **/
-	const uint32_t getSize() const {
-		return buffer.size();
-	}
-
-protected:
-	// All serialization related method and internal buf
-	std::vector<uint8_t> buffer;
-	uint32_t readBuffer;
+ public:
+
+  DataStream()
+      : readBuffer(0) {
+
+  }
+
+  ~DataStream() {
+
+  }
+
+  /**
+   * Constructor
+   **/
+  explicit DataStream(const uint8_t *buf, const uint32_t buflen)
+      : DataStream() {
+    writeData((uint8_t*) buf, buflen);
+
+  }
+
+  virtual short initialize() {
+    buffer.clear();
+    readBuffer = 0;
+    return 0;
+  }
+
+  virtual void closeStream() {
+
+  }
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * writes valiue to buffer
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Reads a system word
+   * @param value value to write
+   */
+  virtual int read(uint64_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a uint32_t
+   * @param value value to write
+   */
+  virtual int read(uint32_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a system short
+   * @param value value to write
+   */
+  virtual int read(uint16_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    return &buffer[0];
+  }
+
+  /**
+   * Retrieve size of data stream
+   * @return size of data stream
+   **/
+  const uint32_t getSize() const {
+    return buffer.size();
+  }
+
+ protected:
+  // All serialization related method and internal buf
+  std::vector<uint8_t> buffer;
+  uint32_t readBuffer;
 };
 
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_IO_DATASTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/EndianCheck.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/EndianCheck.h b/libminifi/include/io/EndianCheck.h
index ef900e0..3ceb19c 100644
--- a/libminifi/include/io/EndianCheck.h
+++ b/libminifi/include/io/EndianCheck.h
@@ -1,5 +1,5 @@
 /**
-  *
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,32 +16,34 @@
  * limitations under the License.
  */
 
-
 #ifndef LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_
 #define LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_
-
-
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 /**
  * Mechanism to determine endianness of host.
  * Accounts for only BIG/LITTLE/BIENDIAN
  **/
-class EndiannessCheck
-{
-public:
-    static bool IS_LITTLE;
-private:
-
-    static bool is_little_endian() {
-        /* do whatever is needed at static init time */
-        unsigned int x = 1;
-        char *c = (char*) &x;
-        IS_LITTLE=*c==1;
-        return IS_LITTLE;
-    }
+class EndiannessCheck {
+ public:
+  static bool IS_LITTLE;
+ private:
+
+  static bool is_little_endian() {
+    /* do whatever is needed at static init time */
+    unsigned int x = 1;
+    char *c = (char*) &x;
+    IS_LITTLE = *c == 1;
+    return IS_LITTLE;
+  }
 };
 
-
-
-
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/Serializable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h
index 59d3a73..5ee886b 100644
--- a/libminifi/include/io/Serializable.h
+++ b/libminifi/include/io/Serializable.h
@@ -1,5 +1,5 @@
 /**
-  *
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,9 +22,11 @@
 #include <string>
 #include "EndianCheck.h"
 #include "DataStream.h"
-
-
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 /**
  * Serializable instances provide base functionality to
  * write certain objects/primitives to a data stream.
@@ -32,155 +34,158 @@
  */
 class Serializable {
 
-public:
-
-    /**
-     * Inline function to write T to stream
-     **/
-    template<typename T>
-    inline int writeData(const T &t,DataStream *stream);
-
-    /**
-     * Inline function to write T to to_vec
-     **/
-    template<typename T>
-    inline int writeData(const T &t, uint8_t *to_vec);
-
-    /**
-     * Inline function to write T to to_vec
-     **/
-    template<typename T>
-    inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
-
-
-    /**
-     * write byte to stream
-     * @return resulting write size
-     **/
-    int write(uint8_t value,DataStream *stream);
-
-    /**
-     * write byte to stream
-     * @return resulting write size
-     **/
-    int write(char value,DataStream *stream);
-
-    /**
-     * write 4 bytes to stream
-     * @param base_value non encoded value
-     * @param stream output stream
-     * @param is_little_endian endianness determination
-     * @return resulting write size
-     **/
-    int write(uint32_t base_value,DataStream *stream, bool is_little_endian =
-                  EndiannessCheck::IS_LITTLE);
-
-    /**
-     * write 2 bytes to stream
-     * @param base_value non encoded value
-     * @param stream output stream
-     * @param is_little_endian endianness determination
-     * @return resulting write size
-     **/
-    int write(uint16_t base_value,DataStream *stream, bool is_little_endian =
-                  EndiannessCheck::IS_LITTLE);
-
-    /**
-     * write valueto stream
-     * @param value non encoded value
-     * @param len length of value
-     * @param strema output stream
-     * @return resulting write size
-     **/
-    int write(uint8_t *value, int len,DataStream *stream);
-
-    /**
-     * write 8 bytes to stream
-     * @param base_value non encoded value
-     * @param stream output stream
-     * @param is_little_endian endianness determination
-     * @return resulting write size
-     **/
-    int write(uint64_t base_value,DataStream *stream, bool is_little_endian =
-                  EndiannessCheck::IS_LITTLE);
-
-    /**
-    * write bool to stream
-     * @param value non encoded value
-     * @return resulting write size
-     **/
-    int write(bool value);
-
-    /**
-     * write UTF string to stream
-     * @param str string to write
-     * @return resulting write size
-     **/
-    int writeUTF(std::string str,DataStream *stream, bool widen = false);
-
-    /**
-     * reads a byte from the stream
-     * @param value reference in which will set the result
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int read(uint8_t &value,DataStream *stream);
-
-    /**
-     * reads two bytes from the stream
-     * @param value reference in which will set the result
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int read(uint16_t &base_value,DataStream *stream, bool is_little_endian =
-                 EndiannessCheck::IS_LITTLE);
-
-    /**
-     * reads a byte from the stream
-     * @param value reference in which will set the result
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int read(char &value,DataStream *stream);
-
-    /**
-     * reads a byte array from the stream
-     * @param value reference in which will set the result
-     * @param len length to read
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int read(uint8_t *value, int len,DataStream *stream);
-
-    /**
-     * reads four bytes from the stream
-     * @param value reference in which will set the result
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int read(uint32_t &value,DataStream *stream,
-             bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-    /**
-     * reads eight byte from the stream
-     * @param value reference in which will set the result
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int read(uint64_t &value,DataStream *stream,
-             bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-    /**
-     * read UTF from stream
-     * @param str reference string
-     * @param stream stream from which we will read
-     * @return resulting read size
-     **/
-    int readUTF(std::string &str,DataStream *stream, bool widen = false);
-
-protected:
+ public:
+
+  /**
+   * Inline function to write T to stream
+   **/
+  template<typename T>
+  inline int writeData(const T &t, DataStream *stream);
+
+  /**
+   * Inline function to write T to to_vec
+   **/
+  template<typename T>
+  inline int writeData(const T &t, uint8_t *to_vec);
+
+  /**
+   * Inline function to write T to to_vec
+   **/
+  template<typename T>
+  inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
+
+  /**
+   * write byte to stream
+   * @return resulting write size
+   **/
+  int write(uint8_t value, DataStream *stream);
+
+  /**
+   * write byte to stream
+   * @return resulting write size
+   **/
+  int write(char value, DataStream *stream);
+
+  /**
+   * write 4 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  int write(uint32_t base_value, DataStream *stream, bool is_little_endian =
+                EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write 2 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  int write(uint16_t base_value, DataStream *stream, bool is_little_endian =
+                EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write valueto stream
+   * @param value non encoded value
+   * @param len length of value
+   * @param strema output stream
+   * @return resulting write size
+   **/
+  int write(uint8_t *value, int len, DataStream *stream);
+
+  /**
+   * write 8 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  int write(uint64_t base_value, DataStream *stream, bool is_little_endian =
+                EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write bool to stream
+   * @param value non encoded value
+   * @return resulting write size
+   **/
+  int write(bool value);
+
+  /**
+   * write UTF string to stream
+   * @param str string to write
+   * @return resulting write size
+   **/
+  int writeUTF(std::string str, DataStream *stream, bool widen = false);
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int read(uint8_t &value, DataStream *stream);
+
+  /**
+   * reads two bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int read(uint16_t &base_value, DataStream *stream, bool is_little_endian =
+               EndiannessCheck::IS_LITTLE);
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int read(char &value, DataStream *stream);
+
+  /**
+   * reads a byte array from the stream
+   * @param value reference in which will set the result
+   * @param len length to read
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int read(uint8_t *value, int len, DataStream *stream);
+
+  /**
+   * reads four bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int read(uint32_t &value, DataStream *stream, bool is_little_endian =
+               EndiannessCheck::IS_LITTLE);
+
+  /**
+   * reads eight byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int read(uint64_t &value, DataStream *stream, bool is_little_endian =
+               EndiannessCheck::IS_LITTLE);
+
+  /**
+   * read UTF from stream
+   * @param str reference string
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  int readUTF(std::string &str, DataStream *stream, bool widen = false);
+
+ protected:
 
 };
 
-
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_IO_SERIALIZABLE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/SocketFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/SocketFactory.h b/libminifi/include/io/SocketFactory.h
deleted file mode 100644
index c8cbcb1..0000000
--- a/libminifi/include/io/SocketFactory.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef SOCKET_FACTORY_H
-#define SOCKET_FACTORY_H
-
-#include "ClientSocket.h"
-#include "TLSSocket.h"
-#include "ClientSocket.h"
-#include "Configure.h"
-#include "utils/StringUtils.h"
-
-/**
-  Purpose: Due to the current design this is the only mechanism by which we can
-  inject different socket types
-  
-**/
-class SocketFactory{
-public:
-
-	/**
-	 * Build an instance, creating a memory fence, which
-	 * allows us to avoid locking. This is tantamount to double checked locking.
-	 * @returns new SocketFactory;
-	 */
-	static SocketFactory *getInstance() {
-		SocketFactory* atomic_context = context_instance_.load(
-				std::memory_order_relaxed);
-		std::atomic_thread_fence(std::memory_order_acquire);
-		if (atomic_context == nullptr) {
-			std::lock_guard<std::mutex> lock(context_mutex_);
-			atomic_context = context_instance_.load(std::memory_order_relaxed);
-			if (atomic_context == nullptr) {
-				atomic_context = new SocketFactory();
-				std::atomic_thread_fence(std::memory_order_release);
-				context_instance_.store(atomic_context,
-						std::memory_order_relaxed);
-			}
-		}
-		return atomic_context;
-	}
-
-	/**
-	 * Creates a socket and returns a unique ptr
-	 *
-	 */
-	std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) {
-		Socket *socket = 0;
-		if (is_secure_) {
-#ifdef OPENSSL_SUPPORT
-			socket = new TLSSocket(host, port);
-#else
-			socket = 0;
-#endif
-		} else {
-			socket = new Socket(host, port);
-		}
-		return std::unique_ptr<Socket>(socket);
-	}
-protected:
-	SocketFactory() :
-			configure_(Configure::getConfigure()) {
-		std::string secureStr;
-		is_secure_ = false;
-		if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) {
-			StringUtils::StringToBool(secureStr, is_secure_);
-		}
-	}
-
-	bool is_secure_;
-	static std::atomic<SocketFactory*> context_instance_;
-	static std::mutex context_mutex_;
-
-	Configure *configure_;
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/Sockets.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/Sockets.h b/libminifi/include/io/Sockets.h
new file mode 100644
index 0000000..2c0b163
--- /dev/null
+++ b/libminifi/include/io/Sockets.h
@@ -0,0 +1,27 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_SOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_SOCKET_H_
+
+#include "ClientSocket.h"
+
+#ifdef OPENSSL_SUPPORT
+#include "tls/TLSSocket.h"
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLS_SECURESOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/StreamFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h
new file mode 100644
index 0000000..faa10b5
--- /dev/null
+++ b/libminifi/include/io/StreamFactory.h
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SOCKET_FACTORY_H
+#define SOCKET_FACTORY_H
+
+#include "properties/Configure.h"
+#include "Sockets.h"
+#include "utils/StringUtils.h"
+#include "validation.h"
+
+#ifdef OPENSSL_SUPPORT
+#include "tls/TLSSocket.h"
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: Socket Creator is a class that will determine if the provided socket type
+ * exists per the compilation parameters
+ */
+template<typename T>
+class SocketCreator {
+
+	template<bool cond, typename U>
+	using TypeCheck = typename std::enable_if< cond, U >::type;
+
+public:
+	template<typename U = T>
+	TypeCheck<true, U> *create(const std::string &host, const uint16_t port) {
+		return new T(host, port);
+	}
+	template<typename U = T>
+	TypeCheck<false, U> *create(const std::string &host, const uint16_t port) {
+		return new Socket(host, port);
+	}
+
+};
+
+/**
+ Purpose: Due to the current design this is the only mechanism by which we can
+ inject different socket types
+ 
+ **/
+class StreamFactory {
+public:
+
+	/**
+	 * Build an instance, creating a memory fence, which
+	 * allows us to avoid locking. This is tantamount to double checked locking.
+	 * @returns new StreamFactory;
+	 */
+	static StreamFactory *getInstance() {
+		StreamFactory* atomic_context = context_instance_.load(
+				std::memory_order_relaxed);
+		std::atomic_thread_fence(std::memory_order_acquire);
+		if (atomic_context == nullptr) {
+			std::lock_guard < std::mutex > lock(context_mutex_);
+			atomic_context = context_instance_.load(std::memory_order_relaxed);
+			if (atomic_context == nullptr) {
+				atomic_context = new StreamFactory();
+				std::atomic_thread_fence(std::memory_order_release);
+				context_instance_.store(atomic_context,
+						std::memory_order_relaxed);
+			}
+		}
+		return atomic_context;
+	}
+
+	/**
+	 * Creates a socket and returns a unique ptr
+	 *
+	 */
+	std::unique_ptr<Socket> createSocket(const std::string &host,
+			const uint16_t port) {
+		Socket *socket = 0;
+
+		if (is_secure_) {
+			socket = createSocket<TLSSocket>(host, port);
+		} else {
+			socket = createSocket<Socket>(host, port);
+		}
+		return std::unique_ptr < Socket > (socket);
+	}
+
+protected:
+
+	/**
+	 * Creates a socket and returns a unique ptr
+	 *
+	 */
+	template<typename T>
+	Socket *createSocket(const std::string &host, const uint16_t port) {
+		SocketCreator<T> creator;
+		return creator.create(host, port);
+	}
+
+	StreamFactory() :
+			configure_(Configure::getConfigure()) {
+		std::string secureStr;
+		is_secure_ = false;
+		if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) {
+			org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+					secureStr, is_secure_);
+		}
+	}
+
+	bool is_secure_;
+	static std::atomic<StreamFactory*> context_instance_;
+	static std::mutex context_mutex_;
+
+	Configure *configure_;
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/TLSSocket.h b/libminifi/include/io/TLSSocket.h
deleted file mode 100644
index 32645ca..0000000
--- a/libminifi/include/io/TLSSocket.h
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
-#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
-
-#ifdef OPENSSL_SUPPORT
-#include <cstdint>
-#include "ClientSocket.h"
-#include <atomic>
-#include <mutex>
-
-#include "Configure.h"
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
-#define TLS_ERROR_CONTEXT 1
-#define TLS_ERROR_PEM_MISSING 2
-#define TLS_ERROR_CERT_MISSING 3
-#define TLS_ERROR_KEY_ERROR 4
-#define TLS_ERROR_CERT_ERROR 5
-
-class TLSContext {
-
-public:
-
-	/**
-	 * Build an instance, creating a memory fence, which
-	 * allows us to avoid locking. This is tantamount to double checked locking.
-	 * @returns new TLSContext;
-	 */
-	static TLSContext *getInstance() {
-		TLSContext* atomic_context = context_instance.load(
-				std::memory_order_relaxed);
-		std::atomic_thread_fence(std::memory_order_acquire);
-		if (atomic_context == nullptr) {
-			std::lock_guard<std::mutex> lock(context_mutex);
-			atomic_context = context_instance.load(std::memory_order_relaxed);
-			if (atomic_context == nullptr) {
-				atomic_context = new TLSContext();
-				atomic_context->initialize();
-				std::atomic_thread_fence(std::memory_order_release);
-				context_instance.store(atomic_context,
-						std::memory_order_relaxed);
-			}
-		}
-		return atomic_context;
-	}
-
-	virtual ~TLSContext() {
-		if (0 != ctx)
-			SSL_CTX_free(ctx);
-	}
-
-	SSL_CTX *getContext() {
-		return ctx;
-	}
-
-	short getError() {
-		return error_value;
-	}
-
-	short initialize();
-
-private:
-
-	static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) {
-		std::string passphrase;
-
-		if (Configure::getConfigure()->get(
-				Configure::nifi_security_client_pass_phrase, passphrase)) {
-
-			std::ifstream file(passphrase.c_str(), std::ifstream::in);
-			if (!file.good()) {
-				memset(buf, 0x00, size);
-				return 0;
-			}
-
-			std::string password;
-			password.assign((std::istreambuf_iterator<char>(file)),
-					std::istreambuf_iterator<char>());
-			file.close();
-			memset(buf,0x00,size);
-			memcpy(buf, password.c_str(), password.length()-1);
-
-			return password.length()-1;
-		}
-		return 0;
-	}
-
-	TLSContext();
-
-	std::shared_ptr<Logger> logger_;
-	Configure *configuration;
-	SSL_CTX *ctx;
-
-	short error_value;
-	static std::atomic<TLSContext*> context_instance;
-
-	static std::mutex context_mutex;
-};
-
-class TLSSocket: public Socket {
-public:
-
-	/**
-	 * Constructor that accepts host name, port and listeners. With this
-	 * contructor we will be creating a server socket
-	 * @param hostname our host name
-	 * @param port connecting port
-	 * @param listeners number of listeners in the queue
-	 */
-	explicit TLSSocket(const std::string &hostname, const uint16_t port,
-			const uint16_t listeners);
-
-	/**
-	 * Constructor that creates a client socket.
-	 * @param hostname hostname we are connecting to.
-	 * @param port port we are connecting to.
-	 */
-	explicit TLSSocket(const std::string &hostname, const uint16_t port);
-
-	/**
-	 * Move constructor.
-	 */
-	explicit TLSSocket(const TLSSocket &&);
-
-	virtual ~TLSSocket();
-
-	/**
-	 * Initializes the socket
-	 * @return result of the creation operation.
-	 */
-	short initialize();
-
-	/**
-	 * Attempt to select the socket file descriptor
-	 * @param msec timeout interval to wait
-	 * @returns file descriptor
-	 */
-	virtual short select_descriptor(const uint16_t msec);
-
-	/**
-	 * Reads data and places it into buf
-	 * @param buf buffer in which we extract data
-	 * @param buflen
-	 */
-	virtual int readData(uint8_t *buf, int buflen);
-
-	/**
-	 * Write value to the stream using std::vector
-	 * @param buf incoming buffer
-	 * @param buflen buffer to write
-	 *
-	 */
-	int writeData(std::vector<uint8_t> &buf, int buflen);
-
-	/**
-	 * Write value to the stream using uint8_t ptr
-	 * @param buf incoming buffer
-	 * @param buflen buffer to write
-	 *
-	 */
-	int writeData(uint8_t *value, int size);
-
-protected:
-
-	SSL* ssl;
-
-};
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
new file mode 100644
index 0000000..f86f8bc
--- /dev/null
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
+
+#include <cstdint>
+#include "../ClientSocket.h"
+#include <atomic>
+#include <mutex>
+
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#define TLS_ERROR_CONTEXT 1
+#define TLS_ERROR_PEM_MISSING 2
+#define TLS_ERROR_CERT_MISSING 3
+#define TLS_ERROR_KEY_ERROR 4
+#define TLS_ERROR_CERT_ERROR 5
+
+class TLSContext {
+
+ public:
+
+  /**
+   * Build an instance, creating a memory fence, which
+   * allows us to avoid locking. This is tantamount to double checked locking.
+   * @returns new TLSContext;
+   */
+  static TLSContext *getInstance() {
+    TLSContext* atomic_context = context_instance.load(
+        std::memory_order_relaxed);
+    std::atomic_thread_fence(std::memory_order_acquire);
+    if (atomic_context == nullptr) {
+      std::lock_guard<std::mutex> lock(context_mutex);
+      atomic_context = context_instance.load(std::memory_order_relaxed);
+      if (atomic_context == nullptr) {
+        atomic_context = new TLSContext();
+        atomic_context->initialize();
+        std::atomic_thread_fence(std::memory_order_release);
+        context_instance.store(atomic_context, std::memory_order_relaxed);
+      }
+    }
+    return atomic_context;
+  }
+
+  virtual ~TLSContext() {
+    if (0 != ctx)
+      SSL_CTX_free(ctx);
+  }
+
+  SSL_CTX *getContext() {
+    return ctx;
+  }
+
+  short getError() {
+    return error_value;
+  }
+
+  short initialize();
+
+ private:
+
+  static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) {
+    std::string passphrase;
+
+    if (Configure::getConfigure()->get(
+        Configure::nifi_security_client_pass_phrase, passphrase)) {
+
+      std::ifstream file(passphrase.c_str(), std::ifstream::in);
+      if (!file.good()) {
+        memset(buf, 0x00, size);
+        return 0;
+      }
+
+      std::string password;
+      password.assign((std::istreambuf_iterator<char>(file)),
+                      std::istreambuf_iterator<char>());
+      file.close();
+      memset(buf, 0x00, size);
+      memcpy(buf, password.c_str(), password.length() - 1);
+
+      return password.length() - 1;
+    }
+    return 0;
+  }
+
+  TLSContext();
+
+  std::shared_ptr<logging::Logger> logger_;
+  Configure *configuration;
+  SSL_CTX *ctx;
+
+  short error_value;
+
+  static std::atomic<TLSContext*> context_instance;
+  static std::mutex context_mutex;
+
+};
+
+class TLSSocket : public Socket {
+ public:
+
+  /**
+   * Constructor that accepts host name, port and listeners. With this
+   * contructor we will be creating a server socket
+   * @param hostname our host name
+   * @param port connecting port
+   * @param listeners number of listeners in the queue
+   */
+  explicit TLSSocket(const std::string &hostname, const uint16_t port,
+                     const uint16_t listeners);
+
+  /**
+   * Constructor that creates a client socket.
+   * @param hostname hostname we are connecting to.
+   * @param port port we are connecting to.
+   */
+  explicit TLSSocket(const std::string &hostname, const uint16_t port);
+
+  /**
+   * Move constructor.
+   */
+  explicit TLSSocket(const TLSSocket &&);
+
+  virtual ~TLSSocket();
+
+  /**
+   * Initializes the socket
+   * @return result of the creation operation.
+   */
+  short initialize();
+
+  /**
+   * Attempt to select the socket file descriptor
+   * @param msec timeout interval to wait
+   * @returns file descriptor
+   */
+  virtual short select_descriptor(const uint16_t msec);
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * Write value to the stream using uint8_t ptr
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  int writeData(uint8_t *value, int size);
+
+ protected:
+
+  SSL* ssl;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/validation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h
index e1b4bb6..c66c412 100644
--- a/libminifi/include/io/validation.h
+++ b/libminifi/include/io/validation.h
@@ -18,35 +18,38 @@
 
 #ifndef VALIDATION_H
 #define VALIDATION_H
+#include <type_traits>
 #include <string>
 #include <cstring>
 
+
 /**
  * A checker that will, at compile time, tell us
  * if the declared type has a size method.
  */
 template<typename T>
 class size_function_functor_checker {
-    typedef char hasit;
-    typedef long doesnothaveit;
+  typedef char hasit;
+  typedef long doesnothaveit;
 
-    // look for the declared type
-    template<typename O> static hasit test(decltype(&O::size));
-    template<typename O> static doesnothaveit test(...);
+  // look for the declared type
+  template<typename O> static hasit test(decltype(&O::size));
+  template<typename O> static doesnothaveit test(...);
 
-public:
-    enum {
-        has_size_function = sizeof(test<T>(0)) == sizeof(char)
-    };
+ public:
+  enum {
+    has_size_function = sizeof(test<T>(0)) == sizeof(char)
+  };
 };
 
+
 /**
  * Determines if the variable is null or ::size() == 0
  */
 template<typename T>
 static auto IsNullOrEmpty(
     T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
-    return object.size() == 0;
+  return object.size() == 0;
 }
 
 /**
@@ -55,7 +58,7 @@ static auto IsNullOrEmpty(
 template<typename T>
 static auto IsNullOrEmpty(
     T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
-    return (0 == object || object->size() == 0);
+  return (0 == object || object->size() == 0);
 }
 
 /**
@@ -64,13 +67,13 @@ static auto IsNullOrEmpty(
 template<typename T>
 static auto IsNullOrEmpty(
     T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
-    return (0 == object);
+  return (0 == object);
 }
 /**
  * Determines if the variable is null or strlen(str) == 0
  */
 static auto IsNullOrEmpty(char *str)-> decltype(NULL !=str, bool()) {
-    return (NULL == str || strlen(str) == 0);
+  return (NULL == str || strlen(str) == 0);
 }
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/AppendHostInfo.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h
new file mode 100644
index 0000000..6515918
--- /dev/null
+++ b/libminifi/include/processors/AppendHostInfo.h
@@ -0,0 +1,80 @@
+/**
+ * @file AppendHostInfo.h
+ * AppendHostInfo class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __APPEND_HOSTINFO_H__
+#define __APPEND_HOSTINFO_H__
+
+#include "core/Property.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AppendHostInfo Class
+class AppendHostInfo : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  AppendHostInfo(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+  }
+  // Destructor
+  virtual ~AppendHostInfo() {
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property InterfaceName;
+  static core::Property HostAttribute;
+  static core::Property IPAttribute;
+
+  // Supported Relationships
+  static core::Relationship Success;
+
+ public:
+  // OnTrigger method, implemented by NiFi AppendHostInfo
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi AppendHostInfo
+  virtual void initialize(void);
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h
new file mode 100644
index 0000000..123eed3
--- /dev/null
+++ b/libminifi/include/processors/ExecuteProcess.h
@@ -0,0 +1,125 @@
+/**
+ * @file ExecuteProcess.h
+ * ExecuteProcess class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXECUTE_PROCESS_H__
+#define __EXECUTE_PROCESS_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <iostream>
+#include <sys/types.h>
+#include <signal.h>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// ExecuteProcess Class
+class ExecuteProcess : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  ExecuteProcess(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+    _redirectErrorStream = false;
+    _batchDuration = 0;
+    _workingDir = ".";
+    _processRunning = false;
+    _pid = 0;
+  }
+  // Destructor
+  virtual ~ExecuteProcess() {
+    if (_processRunning && _pid > 0)
+      kill(_pid, SIGTERM);
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property Command;
+  static core::Property CommandArguments;
+  static core::Property WorkingDir;
+  static core::Property BatchDuration;
+  static core::Property RedirectErrorStream;
+  // Supported Relationships
+  static core::Relationship Success;
+
+  // Nest Callback Class for write stream
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(char *data, uint64_t size)
+        : _data(data),
+          _dataSize(size) {
+    }
+    char *_data;
+    uint64_t _dataSize;
+    void process(std::ofstream *stream) {
+      if (_data && _dataSize > 0)
+        stream->write(_data, _dataSize);
+    }
+  };
+
+ public:
+  // OnTrigger method, implemented by NiFi ExecuteProcess
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi ExecuteProcess
+  virtual void initialize(void);
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Property
+  std::string _command;
+  std::string _commandArgument;
+  std::string _workingDir;
+  int64_t _batchDuration;
+  bool _redirectErrorStream;
+  // Full command
+  std::string _fullCommand;
+  // whether the process is running
+  bool _processRunning;
+  int _pipefd[2];
+  pid_t _pid;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/GenerateFlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h
new file mode 100644
index 0000000..c4ab6fe
--- /dev/null
+++ b/libminifi/include/processors/GenerateFlowFile.h
@@ -0,0 +1,98 @@
+/**
+ * @file GenerateFlowFile.h
+ * GenerateFlowFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GENERATE_FLOW_FILE_H__
+#define __GENERATE_FLOW_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+// GenerateFlowFile Class
+class GenerateFlowFile : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  GenerateFlowFile(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid) {
+    _data = NULL;
+    _dataSize = 0;
+  }
+  // Destructor
+  virtual ~GenerateFlowFile() {
+    if (_data)
+      delete[] _data;
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property FileSize;
+  static core::Property BatchSize;
+  static core::Property DataFormat;
+  static core::Property UniqueFlowFiles;
+  static const char *DATA_FORMAT_BINARY;
+  static const char *DATA_FORMAT_TEXT;
+  // Supported Relationships
+  static core::Relationship Success;
+  // Nest Callback Class for write stream
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(char *data, uint64_t size)
+        : _data(data),
+          _dataSize(size) {
+    }
+    char *_data;
+    uint64_t _dataSize;
+    void process(std::ofstream *stream) {
+      if (_data && _dataSize > 0)
+        stream->write(_data, _dataSize);
+    }
+  };
+
+ public:
+  // OnTrigger method, implemented by NiFi GenerateFlowFile
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi GenerateFlowFile
+  virtual void initialize(void);
+
+ protected:
+
+ private:
+  // Generated data
+  char * _data;
+  // Size of the generate data
+  uint64_t _dataSize;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h
new file mode 100644
index 0000000..f1f0694
--- /dev/null
+++ b/libminifi/include/processors/GetFile.h
@@ -0,0 +1,129 @@
+/**
+ * @file GetFile.h
+ * GetFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GET_FILE_H__
+#define __GET_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// GetFile Class
+class GetFile : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  GetFile(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+    _directory = ".";
+    _recursive = true;
+    _keepSourceFile = false;
+    _minAge = 0;
+    _maxAge = 0;
+    _minSize = 0;
+    _maxSize = 0;
+    _ignoreHiddenFile = true;
+    _pollInterval = 0;
+    _batchSize = 10;
+    _lastDirectoryListingTime = getTimeMillis();
+    _fileFilter = "[^\\.].*";
+  }
+  // Destructor
+  virtual ~GetFile() {
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property Directory;
+  static core::Property Recurse;
+  static core::Property KeepSourceFile;
+  static core::Property MinAge;
+  static core::Property MaxAge;
+  static core::Property MinSize;
+  static core::Property MaxSize;
+  static core::Property IgnoreHiddenFile;
+  static core::Property PollInterval;
+  static core::Property BatchSize;
+  static core::Property FileFilter;
+  // Supported Relationships
+  static core::Relationship Success;
+
+ public:
+  // OnTrigger method, implemented by NiFi GetFile
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi GetFile
+  virtual void initialize(void);
+  // perform directory listing
+  void performListing(std::string dir);
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Queue for store directory list
+  std::queue<std::string> _dirList;
+  // Get Listing size
+  uint64_t getListingSize() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return _dirList.size();
+  }
+  // Whether the directory listing is empty
+  bool isListingEmpty();
+  // Put full path file name into directory listing
+  void putListing(std::string fileName);
+  // Poll directory listing for files
+  void pollListing(std::queue<std::string> &list, int maxSize);
+  // Check whether file can be added to the directory listing
+  bool acceptFile(std::string fullName, std::string name);
+  // Mutex for protection of the directory listing
+  std::mutex mutex_;
+  std::string _directory;
+  bool _recursive;
+  bool _keepSourceFile;
+  int64_t _minAge;
+  int64_t _maxAge;
+  int64_t _minSize;
+  int64_t _maxSize;
+  bool _ignoreHiddenFile;
+  int64_t _pollInterval;
+  int64_t _batchSize;
+  uint64_t _lastDirectoryListingTime;
+  std::string _fileFilter;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h
new file mode 100644
index 0000000..adaefb1
--- /dev/null
+++ b/libminifi/include/processors/ListenHTTP.h
@@ -0,0 +1,126 @@
+/**
+ * @file ListenHTTP.h
+ * ListenHTTP class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_HTTP_H__
+#define __LISTEN_HTTP_H__
+
+#include <memory>
+#include <regex>
+
+#include <CivetServer.h>
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// ListenHTTP Class
+class ListenHTTP : public core::Processor {
+ public:
+
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  ListenHTTP(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid) {
+    _logger = logging::Logger::getLogger();
+  }
+  // Destructor
+  virtual ~ListenHTTP();
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property BasePath;
+  static core::Property Port;
+  static core::Property AuthorizedDNPattern;
+  static core::Property SSLCertificate;
+  static core::Property SSLCertificateAuthority;
+  static core::Property SSLVerifyPeer;
+  static core::Property SSLMinimumVersion;
+  static core::Property HeadersAsAttributesRegex;
+  // Supported Relationships
+  static core::Relationship Success;
+
+  void onTrigger(core::ProcessContext *context,
+                 core::ProcessSession *session);
+  void initialize();
+  void onSchedule(
+      core::ProcessContext *context,
+      core::ProcessSessionFactory *sessionFactory);
+
+  // HTTP request handler
+  class Handler : public CivetHandler {
+   public:
+    Handler(
+        core::ProcessContext *context,
+        core::ProcessSessionFactory *sessionFactory,
+        std::string &&authDNPattern, std::string &&headersAsAttributesPattern);
+    bool handlePost(CivetServer *server, struct mg_connection *conn);
+
+   private:
+    // Send HTTP 500 error response to client
+    void sendErrorResponse(struct mg_connection *conn);
+    // Logger
+    std::shared_ptr<logging::Logger> _logger;
+
+    std::regex _authDNRegex;
+    std::regex _headersAsAttributesRegex;
+    core::ProcessContext *_processContext;
+    core::ProcessSessionFactory *_processSessionFactory;
+  };
+
+  // Write callback for transferring data from HTTP request to content repo
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(struct mg_connection *conn,
+                  const struct mg_request_info *reqInfo);
+    void process(std::ofstream *stream);
+
+   private:
+    // Logger
+    std::shared_ptr<logging::Logger> _logger;
+
+    struct mg_connection *_conn;
+    const struct mg_request_info *_reqInfo;
+  };
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> _logger;
+
+  std::unique_ptr<CivetServer> _server;
+  std::unique_ptr<Handler> _handler;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
new file mode 100644
index 0000000..1e1e11f
--- /dev/null
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -0,0 +1,216 @@
+/**
+ * @file ListenSyslog.h
+ * ListenSyslog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_SYSLOG_H__
+#define __LISTEN_SYSLOG_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <chrono>
+#include <thread>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// SyslogEvent
+typedef struct {
+  uint8_t *payload;
+  uint64_t len;
+} SysLogEvent;
+
+// ListenSyslog Class
+class ListenSyslog : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  ListenSyslog(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+    _eventQueueByteSize = 0;
+    _serverSocket = 0;
+    _recvBufSize = 65507;
+    _maxSocketBufSize = 1024 * 1024;
+    _maxConnections = 2;
+    _maxBatchSize = 1;
+    _messageDelimiter = "\n";
+    _protocol = "UDP";
+    _port = 514;
+    _parseMessages = false;
+    _serverSocket = 0;
+    _maxFds = 0;
+    FD_ZERO(&_readfds);
+    _thread = NULL;
+    _resetServerSocket = false;
+    _serverTheadRunning = false;
+  }
+  // Destructor
+  virtual ~ListenSyslog() {
+    _serverTheadRunning = false;
+    if (this->_thread)
+      delete this->_thread;
+    // need to reset the socket
+    std::vector<int>::iterator it;
+    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
+      int clientSocket = *it;
+      close(clientSocket);
+    }
+    _clientSockets.clear();
+    if (_serverSocket > 0) {
+      logger_->log_info("ListenSysLog Server socket %d close", _serverSocket);
+      close(_serverSocket);
+      _serverSocket = 0;
+    }
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property RecvBufSize;
+  static core::Property MaxSocketBufSize;
+  static core::Property MaxConnections;
+  static core::Property MaxBatchSize;
+  static core::Property MessageDelimiter;
+  static core::Property ParseMessages;
+  static core::Property Protocol;
+  static core::Property Port;
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship Invalid;
+  // Nest Callback Class for write stream
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(char *data, uint64_t size)
+        : _data(data),
+          _dataSize(size) {
+    }
+    char *_data;
+    uint64_t _dataSize;
+    void process(std::ofstream *stream) {
+      if (_data && _dataSize > 0)
+        stream->write(_data, _dataSize);
+    }
+  };
+
+ public:
+  // OnTrigger method, implemented by NiFi ListenSyslog
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi ListenSyslog
+  virtual void initialize(void);
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Run function for the thread
+  static void run(ListenSyslog *process);
+  // Run Thread
+  void runThread();
+  // Queue for store syslog event
+  std::queue<SysLogEvent> _eventQueue;
+  // Size of Event queue in bytes
+  uint64_t _eventQueueByteSize;
+  // Get event queue size
+  uint64_t getEventQueueSize() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return _eventQueue.size();
+  }
+  // Get event queue byte size
+  uint64_t getEventQueueByteSize() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return _eventQueueByteSize;
+  }
+  // Whether the event queue  is empty
+  bool isEventQueueEmpty() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return _eventQueue.empty();
+  }
+  // Put event into directory listing
+  void putEvent(uint8_t *payload, uint64_t len) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    SysLogEvent event;
+    event.payload = payload;
+    event.len = len;
+    _eventQueue.push(event);
+    _eventQueueByteSize += len;
+  }
+  // Read \n terminated line from TCP socket
+  int readline(int fd, char *bufptr, size_t len);
+  // start server socket and handling client socket
+  void startSocketThread();
+  // Poll event
+  void pollEvent(std::queue<SysLogEvent> &list, int maxSize) {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) {
+      SysLogEvent event = _eventQueue.front();
+      _eventQueue.pop();
+      _eventQueueByteSize -= event.len;
+      list.push(event);
+    }
+    return;
+  }
+  // Mutex for protection of the directory listing
+  std::mutex mutex_;
+  int64_t _recvBufSize;
+  int64_t _maxSocketBufSize;
+  int64_t _maxConnections;
+  int64_t _maxBatchSize;
+  std::string _messageDelimiter;
+  std::string _protocol;
+  int64_t _port;
+  bool _parseMessages;
+  int _serverSocket;
+  std::vector<int> _clientSockets;
+  int _maxFds;
+  fd_set _readfds;
+  // thread
+  std::thread *_thread;
+  // whether to reset the server socket
+  bool _resetServerSocket;
+  bool _serverTheadRunning;
+  // buffer for read socket
+  uint8_t _buffer[2048];
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h
new file mode 100644
index 0000000..37c0ec3
--- /dev/null
+++ b/libminifi/include/processors/LogAttribute.h
@@ -0,0 +1,130 @@
+/**
+ * @file LogAttribute.h
+ * LogAttribute class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOG_ATTRIBUTE_H__
+#define __LOG_ATTRIBUTE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// LogAttribute Class
+class LogAttribute : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  LogAttribute(std::string name, uuid_t uuid = NULL)
+      : Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+  }
+  // Destructor
+  virtual ~LogAttribute() {
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property LogLevel;
+  static core::Property AttributesToLog;
+  static core::Property AttributesToIgnore;
+  static core::Property LogPayload;
+  static core::Property LogPrefix;
+  // Supported Relationships
+  static core::Relationship Success;
+  enum LogAttrLevel {
+    LogAttrLevelTrace,
+    LogAttrLevelDebug,
+    LogAttrLevelInfo,
+    LogAttrLevelWarn,
+    LogAttrLevelError
+  };
+  // Convert log level from string to enum
+  bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) {
+    if (logStr == "trace") {
+      level = LogAttrLevelTrace;
+      return true;
+    } else if (logStr == "debug") {
+      level = LogAttrLevelDebug;
+      return true;
+    } else if (logStr == "info") {
+      level = LogAttrLevelInfo;
+      return true;
+    } else if (logStr == "warn") {
+      level = LogAttrLevelWarn;
+      return true;
+    } else if (logStr == "error") {
+      level = LogAttrLevelError;
+      return true;
+    } else
+      return false;
+  }
+  // Nest Callback Class for read stream
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback(uint64_t size) {
+      _bufferSize = size;
+      _buffer = new char[_bufferSize];
+    }
+    ~ReadCallback() {
+      if (_buffer)
+        delete[] _buffer;
+    }
+    void process(std::ifstream *stream) {
+
+      stream->read(_buffer, _bufferSize);
+      if (!stream)
+        _readSize = stream->gcount();
+      else
+        _readSize = _bufferSize;
+    }
+    char *_buffer;
+    uint64_t _bufferSize;
+    uint64_t _readSize;
+  };
+
+ public:
+  // OnTrigger method, implemented by NiFi LogAttribute
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi LogAttribute
+  virtual void initialize(void);
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
new file mode 100644
index 0000000..7653fac
--- /dev/null
+++ b/libminifi/include/processors/PutFile.h
@@ -0,0 +1,101 @@
+/**
+ * @file PutFile.h
+ * PutFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_FILE_H__
+#define __PUT_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PutFile Class
+class PutFile : public core::Processor {
+ public:
+
+  static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
+  static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
+  static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;
+
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  PutFile(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+  }
+  // Destructor
+  virtual ~PutFile() {
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property Directory;
+  static core::Property ConflictResolution;
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  // OnTrigger method, implemented by NiFi PutFile
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi PutFile
+  virtual void initialize(void);
+
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback(const std::string &tmpFile, const std::string &destFile);
+    ~ReadCallback();
+    virtual void process(std::ifstream *stream);
+    bool commit();
+
+   private:
+    std::shared_ptr<logging::Logger> logger_;
+    std::ofstream _tmpFileOs;
+    bool _writeSucceeded = false;
+    std::string _tmpFile;
+    std::string _destFile;
+  };
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+
+  bool putFile(core::ProcessSession *session,
+               std::shared_ptr<FlowFileRecord> flowFile,
+               const std::string &tmpFile, const std::string &destFile);
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/RealTimeDataCollector.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/RealTimeDataCollector.h b/libminifi/include/processors/RealTimeDataCollector.h
new file mode 100644
index 0000000..41bd814
--- /dev/null
+++ b/libminifi/include/processors/RealTimeDataCollector.h
@@ -0,0 +1,145 @@
+/**
+ * @file RealTimeDataCollector.h
+ * RealTimeDataCollector class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REAL_TIME_DATA_COLLECTOR_H__
+#define __REAL_TIME_DATA_COLLECTOR_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// RealTimeDataCollector Class
+class RealTimeDataCollector : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid) {
+    _realTimeSocket = 0;
+    _batchSocket = 0;
+    logger_ = logging::Logger::getLogger();
+    _firstInvoking = false;
+    _realTimeAccumulated = 0;
+    _batchAcccumulated = 0;
+    _queuedDataSize = 0;
+  }
+  // Destructor
+  virtual ~RealTimeDataCollector() {
+    if (_realTimeSocket)
+      close(_realTimeSocket);
+    if (_batchSocket)
+      close(_batchSocket);
+    if (_fileStream.is_open())
+      _fileStream.close();
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property REALTIMESERVERNAME;
+  static core::Property REALTIMESERVERPORT;
+  static core::Property BATCHSERVERNAME;
+  static core::Property BATCHSERVERPORT;
+  static core::Property FILENAME;
+  static core::Property ITERATION;
+  static core::Property REALTIMEMSGID;
+  static core::Property BATCHMSGID;
+  static core::Property REALTIMEINTERVAL;
+  static core::Property BATCHINTERVAL;
+  static core::Property BATCHMAXBUFFERSIZE;
+  // Supported Relationships
+  static core::Relationship Success;
+  // Connect to the socket
+  int connectServer(const char *host, uint16_t port);
+  int sendData(int socket, const char *buf, int buflen);
+  void onTriggerRealTime(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  void onTriggerBatch(core::ProcessContext *context,
+                      core::ProcessSession *session);
+
+ public:
+  // OnTrigger method, implemented by NiFi RealTimeDataCollector
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi RealTimeDataCollector
+  virtual void initialize(void);
+
+ protected:
+
+ private:
+  // realtime server Name
+  std::string _realTimeServerName;
+  int64_t _realTimeServerPort;
+  std::string _batchServerName;
+  int64_t _batchServerPort;
+  int64_t _realTimeInterval;
+  int64_t _batchInterval;
+  int64_t _batchMaxBufferSize;
+  // Match pattern for Real time Message ID
+  std::vector<std::string> _realTimeMsgID;
+  // Match pattern for Batch Message ID
+  std::vector<std::string> _batchMsgID;
+  // file for which the realTime collector will tail
+  std::string _fileName;
+  // Whether we need to iterate from the beginning for demo
+  bool _iteration;
+  int _realTimeSocket;
+  int _batchSocket;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Mutex for protection
+  std::mutex mutex_;
+  // Queued data size
+  uint64_t _queuedDataSize;
+  // Queue for the batch process
+  std::queue<std::string> _queue;
+  std::thread::id _realTimeThreadId;
+  std::thread::id _batchThreadId;
+  std::atomic<bool> _firstInvoking;
+  int64_t _realTimeAccumulated;
+  int64_t _batchAcccumulated;
+  std::ifstream _fileStream;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
new file mode 100644
index 0000000..5be76e4
--- /dev/null
+++ b/libminifi/include/processors/TailFile.h
@@ -0,0 +1,105 @@
+/**
+ * @file TailFile.h
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TAIL_FILE_H__
+#define __TAIL_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// TailFile Class
+class TailFile : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  TailFile(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid) {
+    logger_ = logging::Logger::getLogger();
+    _stateRecovered = false;
+  }
+  // Destructor
+  virtual ~TailFile() {
+    storeState();
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property FileName;
+  static core::Property StateFile;
+  // Supported Relationships
+  static core::Relationship Success;
+
+ public:
+  // OnTrigger method, implemented by NiFi TailFile
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi TailFile
+  virtual void initialize(void);
+  // recoverState
+  void recoverState();
+  // storeState
+  void storeState();
+
+ protected:
+
+ private:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  std::string _fileLocation;
+  // Property Specified Tailed File Name
+  std::string _fileName;
+  // File to save state
+  std::string _stateFile;
+  // State related to the tailed file
+  std::string _currentTailFileName;
+  uint64_t _currentTailFilePosition;
+  bool _stateRecovered;
+  uint64_t _currentTailFileCreatedTime;
+  // Utils functions for parse state file
+  std::string trimLeft(const std::string& s);
+  std::string trimRight(const std::string& s);
+  void parseStateFileLine(char *buf);
+  void checkRollOver();
+
+};
+
+// Matched File Item for Roll over check
+typedef struct {
+  std::string fileName;
+  uint64_t modifiedTime;
+} TailMatchedFileItem;
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
new file mode 100644
index 0000000..c0d9bd4
--- /dev/null
+++ b/libminifi/include/properties/Configure.h
@@ -0,0 +1,131 @@
+/**
+ * @file Configure.h
+ * Configure class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONFIGURE_H__
+#define __CONFIGURE_H__
+
+#include <stdio.h>
+#include <string>
+#include <map>
+#include <stdlib.h>
+#include <errno.h>
+#include <iostream>
+#include <fstream>
+#include "core/core.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class Configure {
+ public:
+  // Get the singleton logger instance
+  static Configure * getConfigure() {
+    if (!configure_) {
+      configure_ = new Configure();
+    }
+    return configure_;
+  }
+  // nifi.flow.configuration.file
+  static const char *nifi_flow_configuration_file;
+  static const char *nifi_administrative_yield_duration;
+  static const char *nifi_bored_yield_duration;
+  static const char *nifi_graceful_shutdown_seconds;
+  static const char *nifi_log_level;
+  static const char *nifi_server_name;
+  static const char *nifi_configuration_class_name;
+  static const char *nifi_flow_repository_class_name;
+  static const char *nifi_provenance_repository_class_name;
+  static const char *nifi_server_port;
+  static const char *nifi_server_report_interval;
+  static const char *nifi_provenance_repository_max_storage_time;
+  static const char *nifi_provenance_repository_max_storage_size;
+  static const char *nifi_provenance_repository_directory_default;
+  static const char *nifi_provenance_repository_enable;
+  static const char *nifi_flowfile_repository_max_storage_time;
+  static const char *nifi_flowfile_repository_max_storage_size;
+  static const char *nifi_flowfile_repository_directory_default;
+  static const char *nifi_flowfile_repository_enable;
+  static const char *nifi_remote_input_secure;
+  static const char *nifi_security_need_ClientAuth;
+  static const char *nifi_security_client_certificate;
+  static const char *nifi_security_client_private_key;
+  static const char *nifi_security_client_pass_phrase;
+  static const char *nifi_security_client_ca_certificate;
+
+  // Clear the load config
+  void clear() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    properties_.clear();
+  }
+  // Set the config value
+  void set(std::string key, std::string value) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    properties_[key] = value;
+  }
+  // Check whether the config value existed
+  bool has(std::string key) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return (properties_.find(key) != properties_.end());
+  }
+  // Get the config value
+  bool get(std::string key, std::string &value);
+  // Parse one line in configure file like key=value
+  void parseConfigureFileLine(char *buf);
+  // Load Configure File
+  void loadConfigureFile(const char *fileName);
+  // Set the determined MINIFI_HOME
+  void setHome(std::string minifiHome) {
+    minifi_home_ = minifiHome;
+  }
+
+  // Get the determined MINIFI_HOME
+  std::string getHome() {
+    return minifi_home_;
+  }
+  // Parse Command Line
+  void parseCommandLine(int argc, char **argv);
+
+ private:
+  // Mutex for protection
+  std::mutex mutex_;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Home location for this executable
+  std::string minifi_home_;
+
+  Configure() {
+    logger_ = logging::Logger::getLogger();
+  }
+  virtual ~Configure() {
+
+  }
+  static Configure *configure_;
+
+ protected:
+  std::map<std::string, std::string> properties_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif