You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:16 UTC
[11/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces
and removes use of raw pointers for user facing API.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/DataStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h
index 2f793ff..e37dbf7 100644
--- a/libminifi/include/io/DataStream.h
+++ b/libminifi/include/io/DataStream.h
@@ -22,99 +22,111 @@
#include <cstdint>
#include <vector>
#include "EndianCheck.h"
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
/**
* DataStream defines the mechanism through which
* binary data will be written to a sink
*/
class DataStream {
-public:
-
- DataStream() :
- readBuffer(0) {
-
- }
-
- /**
- * Constructor
- **/
- explicit DataStream(const uint8_t *buf, const uint32_t buflen) :
- DataStream() {
- writeData((uint8_t*) buf, buflen);
-
- }
-
- virtual short initialize() {
- buffer.clear();
- readBuffer = 0;
- return 0;
- }
-
- virtual void closeStream()
- {
-
- }
- /**
- * Reads data and places it into buf
- * @param buf buffer in which we extract data
- * @param buflen
- */
- virtual int readData(std::vector<uint8_t> &buf, int buflen);
- /**
- * Reads data and places it into buf
- * @param buf buffer in which we extract data
- * @param buflen
- */
- virtual int readData(uint8_t *buf, int buflen);
-
- /**
- * writes valiue to buffer
- * @param value value to write
- * @param size size of value
- */
- virtual int writeData(uint8_t *value, int size);
-
- /**
- * Reads a system word
- * @param value value to write
- */
- virtual int read(uint64_t &value, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * Reads a uint32_t
- * @param value value to write
- */
- virtual int read(uint32_t &value, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * Reads a system short
- * @param value value to write
- */
- virtual int read(uint16_t &value, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * Returns the underlying buffer
- * @return vector's array
- **/
- const uint8_t *getBuffer() const {
- return &buffer[0];
- }
-
- /**
- * Retrieve size of data stream
- * @return size of data stream
- **/
- const uint32_t getSize() const {
- return buffer.size();
- }
-
-protected:
- // All serialization related method and internal buf
- std::vector<uint8_t> buffer;
- uint32_t readBuffer;
+ public:
+
+ DataStream()
+ : readBuffer(0) {
+
+ }
+
+ ~DataStream() {
+
+ }
+
+ /**
+ * Constructor
+ **/
+ explicit DataStream(const uint8_t *buf, const uint32_t buflen)
+ : DataStream() {
+ writeData((uint8_t*) buf, buflen);
+
+ }
+
+ virtual short initialize() {
+ buffer.clear();
+ readBuffer = 0;
+ return 0;
+ }
+
+ virtual void closeStream() {
+
+ }
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen);
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(uint8_t *buf, int buflen);
+
+ /**
+ * writes valiue to buffer
+ * @param value value to write
+ * @param size size of value
+ */
+ virtual int writeData(uint8_t *value, int size);
+
+ /**
+ * Reads a system word
+ * @param value value to write
+ */
+ virtual int read(uint64_t &value, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a uint32_t
+ * @param value value to write
+ */
+ virtual int read(uint32_t &value, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a system short
+ * @param value value to write
+ */
+ virtual int read(uint16_t &value, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Returns the underlying buffer
+ * @return vector's array
+ **/
+ const uint8_t *getBuffer() const {
+ return &buffer[0];
+ }
+
+ /**
+ * Retrieve size of data stream
+ * @return size of data stream
+ **/
+ const uint32_t getSize() const {
+ return buffer.size();
+ }
+
+ protected:
+ // All serialization related method and internal buf
+ std::vector<uint8_t> buffer;
+ uint32_t readBuffer;
};
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_IO_DATASTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/EndianCheck.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/EndianCheck.h b/libminifi/include/io/EndianCheck.h
index ef900e0..3ceb19c 100644
--- a/libminifi/include/io/EndianCheck.h
+++ b/libminifi/include/io/EndianCheck.h
@@ -1,5 +1,5 @@
/**
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,32 +16,34 @@
* limitations under the License.
*/
-
#ifndef LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_
#define LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_
-
-
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
/**
* Mechanism to determine endianness of host.
* Accounts for only BIG/LITTLE/BIENDIAN
**/
-class EndiannessCheck
-{
-public:
- static bool IS_LITTLE;
-private:
-
- static bool is_little_endian() {
- /* do whatever is needed at static init time */
- unsigned int x = 1;
- char *c = (char*) &x;
- IS_LITTLE=*c==1;
- return IS_LITTLE;
- }
+class EndiannessCheck {
+ public:
+ static bool IS_LITTLE;
+ private:
+
+ static bool is_little_endian() {
+ /* do whatever is needed at static init time */
+ unsigned int x = 1;
+ char *c = (char*) &x;
+ IS_LITTLE = *c == 1;
+ return IS_LITTLE;
+ }
};
-
-
-
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_IO_ENDIANCHECK_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/Serializable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/Serializable.h b/libminifi/include/io/Serializable.h
index 59d3a73..5ee886b 100644
--- a/libminifi/include/io/Serializable.h
+++ b/libminifi/include/io/Serializable.h
@@ -1,5 +1,5 @@
/**
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,9 +22,11 @@
#include <string>
#include "EndianCheck.h"
#include "DataStream.h"
-
-
-
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
/**
* Serializable instances provide base functionality to
* write certain objects/primitives to a data stream.
@@ -32,155 +34,158 @@
*/
class Serializable {
-public:
-
- /**
- * Inline function to write T to stream
- **/
- template<typename T>
- inline int writeData(const T &t,DataStream *stream);
-
- /**
- * Inline function to write T to to_vec
- **/
- template<typename T>
- inline int writeData(const T &t, uint8_t *to_vec);
-
- /**
- * Inline function to write T to to_vec
- **/
- template<typename T>
- inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
-
-
- /**
- * write byte to stream
- * @return resulting write size
- **/
- int write(uint8_t value,DataStream *stream);
-
- /**
- * write byte to stream
- * @return resulting write size
- **/
- int write(char value,DataStream *stream);
-
- /**
- * write 4 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint32_t base_value,DataStream *stream, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * write 2 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint16_t base_value,DataStream *stream, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * write valueto stream
- * @param value non encoded value
- * @param len length of value
- * @param strema output stream
- * @return resulting write size
- **/
- int write(uint8_t *value, int len,DataStream *stream);
-
- /**
- * write 8 bytes to stream
- * @param base_value non encoded value
- * @param stream output stream
- * @param is_little_endian endianness determination
- * @return resulting write size
- **/
- int write(uint64_t base_value,DataStream *stream, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * write bool to stream
- * @param value non encoded value
- * @return resulting write size
- **/
- int write(bool value);
-
- /**
- * write UTF string to stream
- * @param str string to write
- * @return resulting write size
- **/
- int writeUTF(std::string str,DataStream *stream, bool widen = false);
-
- /**
- * reads a byte from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint8_t &value,DataStream *stream);
-
- /**
- * reads two bytes from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint16_t &base_value,DataStream *stream, bool is_little_endian =
- EndiannessCheck::IS_LITTLE);
-
- /**
- * reads a byte from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(char &value,DataStream *stream);
-
- /**
- * reads a byte array from the stream
- * @param value reference in which will set the result
- * @param len length to read
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint8_t *value, int len,DataStream *stream);
-
- /**
- * reads four bytes from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint32_t &value,DataStream *stream,
- bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
- * reads eight byte from the stream
- * @param value reference in which will set the result
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int read(uint64_t &value,DataStream *stream,
- bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
- /**
- * read UTF from stream
- * @param str reference string
- * @param stream stream from which we will read
- * @return resulting read size
- **/
- int readUTF(std::string &str,DataStream *stream, bool widen = false);
-
-protected:
+ public:
+
+ /**
+ * Inline function to write T to stream
+ **/
+ template<typename T>
+ inline int writeData(const T &t, DataStream *stream);
+
+ /**
+ * Inline function to write T to to_vec
+ **/
+ template<typename T>
+ inline int writeData(const T &t, uint8_t *to_vec);
+
+ /**
+ * Inline function to write T to to_vec
+ **/
+ template<typename T>
+ inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
+
+ /**
+ * write byte to stream
+ * @return resulting write size
+ **/
+ int write(uint8_t value, DataStream *stream);
+
+ /**
+ * write byte to stream
+ * @return resulting write size
+ **/
+ int write(char value, DataStream *stream);
+
+ /**
+ * write 4 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+ int write(uint32_t base_value, DataStream *stream, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * write 2 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+ int write(uint16_t base_value, DataStream *stream, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * write valueto stream
+ * @param value non encoded value
+ * @param len length of value
+ * @param strema output stream
+ * @return resulting write size
+ **/
+ int write(uint8_t *value, int len, DataStream *stream);
+
+ /**
+ * write 8 bytes to stream
+ * @param base_value non encoded value
+ * @param stream output stream
+ * @param is_little_endian endianness determination
+ * @return resulting write size
+ **/
+ int write(uint64_t base_value, DataStream *stream, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * write bool to stream
+ * @param value non encoded value
+ * @return resulting write size
+ **/
+ int write(bool value);
+
+ /**
+ * write UTF string to stream
+ * @param str string to write
+ * @return resulting write size
+ **/
+ int writeUTF(std::string str, DataStream *stream, bool widen = false);
+
+ /**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int read(uint8_t &value, DataStream *stream);
+
+ /**
+ * reads two bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int read(uint16_t &base_value, DataStream *stream, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * reads a byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int read(char &value, DataStream *stream);
+
+ /**
+ * reads a byte array from the stream
+ * @param value reference in which will set the result
+ * @param len length to read
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int read(uint8_t *value, int len, DataStream *stream);
+
+ /**
+ * reads four bytes from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int read(uint32_t &value, DataStream *stream, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * reads eight byte from the stream
+ * @param value reference in which will set the result
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int read(uint64_t &value, DataStream *stream, bool is_little_endian =
+ EndiannessCheck::IS_LITTLE);
+
+ /**
+ * read UTF from stream
+ * @param str reference string
+ * @param stream stream from which we will read
+ * @return resulting read size
+ **/
+ int readUTF(std::string &str, DataStream *stream, bool widen = false);
+
+ protected:
};
-
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_IO_SERIALIZABLE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/SocketFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/SocketFactory.h b/libminifi/include/io/SocketFactory.h
deleted file mode 100644
index c8cbcb1..0000000
--- a/libminifi/include/io/SocketFactory.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef SOCKET_FACTORY_H
-#define SOCKET_FACTORY_H
-
-#include "ClientSocket.h"
-#include "TLSSocket.h"
-#include "ClientSocket.h"
-#include "Configure.h"
-#include "utils/StringUtils.h"
-
-/**
- Purpose: Due to the current design this is the only mechanism by which we can
- inject different socket types
-
-**/
-class SocketFactory{
-public:
-
- /**
- * Build an instance, creating a memory fence, which
- * allows us to avoid locking. This is tantamount to double checked locking.
- * @returns new SocketFactory;
- */
- static SocketFactory *getInstance() {
- SocketFactory* atomic_context = context_instance_.load(
- std::memory_order_relaxed);
- std::atomic_thread_fence(std::memory_order_acquire);
- if (atomic_context == nullptr) {
- std::lock_guard<std::mutex> lock(context_mutex_);
- atomic_context = context_instance_.load(std::memory_order_relaxed);
- if (atomic_context == nullptr) {
- atomic_context = new SocketFactory();
- std::atomic_thread_fence(std::memory_order_release);
- context_instance_.store(atomic_context,
- std::memory_order_relaxed);
- }
- }
- return atomic_context;
- }
-
- /**
- * Creates a socket and returns a unique ptr
- *
- */
- std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) {
- Socket *socket = 0;
- if (is_secure_) {
-#ifdef OPENSSL_SUPPORT
- socket = new TLSSocket(host, port);
-#else
- socket = 0;
-#endif
- } else {
- socket = new Socket(host, port);
- }
- return std::unique_ptr<Socket>(socket);
- }
-protected:
- SocketFactory() :
- configure_(Configure::getConfigure()) {
- std::string secureStr;
- is_secure_ = false;
- if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) {
- StringUtils::StringToBool(secureStr, is_secure_);
- }
- }
-
- bool is_secure_;
- static std::atomic<SocketFactory*> context_instance_;
- static std::mutex context_mutex_;
-
- Configure *configure_;
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/Sockets.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/Sockets.h b/libminifi/include/io/Sockets.h
new file mode 100644
index 0000000..2c0b163
--- /dev/null
+++ b/libminifi/include/io/Sockets.h
@@ -0,0 +1,27 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_SOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_SOCKET_H_
+
+#include "ClientSocket.h"
+
+#ifdef OPENSSL_SUPPORT
+#include "tls/TLSSocket.h"
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLS_SECURESOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/StreamFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h
new file mode 100644
index 0000000..faa10b5
--- /dev/null
+++ b/libminifi/include/io/StreamFactory.h
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SOCKET_FACTORY_H
+#define SOCKET_FACTORY_H
+
+#include "properties/Configure.h"
+#include "Sockets.h"
+#include "utils/StringUtils.h"
+#include "validation.h"
+
+#ifdef OPENSSL_SUPPORT
+#include "tls/TLSSocket.h"
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: Socket Creator is a class that will determine if the provided socket type
+ * exists per the compilation parameters
+ */
+template<typename T>
+class SocketCreator {
+
+ template<bool cond, typename U>
+ using TypeCheck = typename std::enable_if< cond, U >::type;
+
+public:
+ template<typename U = T>
+ TypeCheck<true, U> *create(const std::string &host, const uint16_t port) {
+ return new T(host, port);
+ }
+ template<typename U = T>
+ TypeCheck<false, U> *create(const std::string &host, const uint16_t port) {
+ return new Socket(host, port);
+ }
+
+};
+
+/**
+ Purpose: Due to the current design this is the only mechanism by which we can
+ inject different socket types
+
+ **/
+class StreamFactory {
+public:
+
+ /**
+ * Build an instance, creating a memory fence, which
+ * allows us to avoid locking. This is tantamount to double checked locking.
+ * @returns new StreamFactory;
+ */
+ static StreamFactory *getInstance() {
+ StreamFactory* atomic_context = context_instance_.load(
+ std::memory_order_relaxed);
+ std::atomic_thread_fence(std::memory_order_acquire);
+ if (atomic_context == nullptr) {
+ std::lock_guard < std::mutex > lock(context_mutex_);
+ atomic_context = context_instance_.load(std::memory_order_relaxed);
+ if (atomic_context == nullptr) {
+ atomic_context = new StreamFactory();
+ std::atomic_thread_fence(std::memory_order_release);
+ context_instance_.store(atomic_context,
+ std::memory_order_relaxed);
+ }
+ }
+ return atomic_context;
+ }
+
+ /**
+ * Creates a socket and returns a unique ptr
+ *
+ */
+ std::unique_ptr<Socket> createSocket(const std::string &host,
+ const uint16_t port) {
+ Socket *socket = 0;
+
+ if (is_secure_) {
+ socket = createSocket<TLSSocket>(host, port);
+ } else {
+ socket = createSocket<Socket>(host, port);
+ }
+ return std::unique_ptr < Socket > (socket);
+ }
+
+protected:
+
+ /**
+ * Creates a socket and returns a unique ptr
+ *
+ */
+ template<typename T>
+ Socket *createSocket(const std::string &host, const uint16_t port) {
+ SocketCreator<T> creator;
+ return creator.create(host, port);
+ }
+
+ StreamFactory() :
+ configure_(Configure::getConfigure()) {
+ std::string secureStr;
+ is_secure_ = false;
+ if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) {
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+ secureStr, is_secure_);
+ }
+ }
+
+ bool is_secure_;
+ static std::atomic<StreamFactory*> context_instance_;
+ static std::mutex context_mutex_;
+
+ Configure *configure_;
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/TLSSocket.h b/libminifi/include/io/TLSSocket.h
deleted file mode 100644
index 32645ca..0000000
--- a/libminifi/include/io/TLSSocket.h
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
-#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
-
-#ifdef OPENSSL_SUPPORT
-#include <cstdint>
-#include "ClientSocket.h"
-#include <atomic>
-#include <mutex>
-
-#include "Configure.h"
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
-#define TLS_ERROR_CONTEXT 1
-#define TLS_ERROR_PEM_MISSING 2
-#define TLS_ERROR_CERT_MISSING 3
-#define TLS_ERROR_KEY_ERROR 4
-#define TLS_ERROR_CERT_ERROR 5
-
-class TLSContext {
-
-public:
-
- /**
- * Build an instance, creating a memory fence, which
- * allows us to avoid locking. This is tantamount to double checked locking.
- * @returns new TLSContext;
- */
- static TLSContext *getInstance() {
- TLSContext* atomic_context = context_instance.load(
- std::memory_order_relaxed);
- std::atomic_thread_fence(std::memory_order_acquire);
- if (atomic_context == nullptr) {
- std::lock_guard<std::mutex> lock(context_mutex);
- atomic_context = context_instance.load(std::memory_order_relaxed);
- if (atomic_context == nullptr) {
- atomic_context = new TLSContext();
- atomic_context->initialize();
- std::atomic_thread_fence(std::memory_order_release);
- context_instance.store(atomic_context,
- std::memory_order_relaxed);
- }
- }
- return atomic_context;
- }
-
- virtual ~TLSContext() {
- if (0 != ctx)
- SSL_CTX_free(ctx);
- }
-
- SSL_CTX *getContext() {
- return ctx;
- }
-
- short getError() {
- return error_value;
- }
-
- short initialize();
-
-private:
-
- static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) {
- std::string passphrase;
-
- if (Configure::getConfigure()->get(
- Configure::nifi_security_client_pass_phrase, passphrase)) {
-
- std::ifstream file(passphrase.c_str(), std::ifstream::in);
- if (!file.good()) {
- memset(buf, 0x00, size);
- return 0;
- }
-
- std::string password;
- password.assign((std::istreambuf_iterator<char>(file)),
- std::istreambuf_iterator<char>());
- file.close();
- memset(buf,0x00,size);
- memcpy(buf, password.c_str(), password.length()-1);
-
- return password.length()-1;
- }
- return 0;
- }
-
- TLSContext();
-
- std::shared_ptr<Logger> logger_;
- Configure *configuration;
- SSL_CTX *ctx;
-
- short error_value;
- static std::atomic<TLSContext*> context_instance;
-
- static std::mutex context_mutex;
-};
-
-class TLSSocket: public Socket {
-public:
-
- /**
- * Constructor that accepts host name, port and listeners. With this
- * contructor we will be creating a server socket
- * @param hostname our host name
- * @param port connecting port
- * @param listeners number of listeners in the queue
- */
- explicit TLSSocket(const std::string &hostname, const uint16_t port,
- const uint16_t listeners);
-
- /**
- * Constructor that creates a client socket.
- * @param hostname hostname we are connecting to.
- * @param port port we are connecting to.
- */
- explicit TLSSocket(const std::string &hostname, const uint16_t port);
-
- /**
- * Move constructor.
- */
- explicit TLSSocket(const TLSSocket &&);
-
- virtual ~TLSSocket();
-
- /**
- * Initializes the socket
- * @return result of the creation operation.
- */
- short initialize();
-
- /**
- * Attempt to select the socket file descriptor
- * @param msec timeout interval to wait
- * @returns file descriptor
- */
- virtual short select_descriptor(const uint16_t msec);
-
- /**
- * Reads data and places it into buf
- * @param buf buffer in which we extract data
- * @param buflen
- */
- virtual int readData(uint8_t *buf, int buflen);
-
- /**
- * Write value to the stream using std::vector
- * @param buf incoming buffer
- * @param buflen buffer to write
- *
- */
- int writeData(std::vector<uint8_t> &buf, int buflen);
-
- /**
- * Write value to the stream using uint8_t ptr
- * @param buf incoming buffer
- * @param buflen buffer to write
- *
- */
- int writeData(uint8_t *value, int size);
-
-protected:
-
- SSL* ssl;
-
-};
-#endif
-
-#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
new file mode 100644
index 0000000..f86f8bc
--- /dev/null
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_
+
+#include <cstdint>
+#include "../ClientSocket.h"
+#include <atomic>
+#include <mutex>
+
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#define TLS_ERROR_CONTEXT 1
+#define TLS_ERROR_PEM_MISSING 2
+#define TLS_ERROR_CERT_MISSING 3
+#define TLS_ERROR_KEY_ERROR 4
+#define TLS_ERROR_CERT_ERROR 5
+
+class TLSContext {
+
+ public:
+
+ /**
+ * Build an instance, creating a memory fence, which
+ * allows us to avoid locking. This is tantamount to double checked locking.
+ * @returns new TLSContext;
+ */
+ static TLSContext *getInstance() {
+ TLSContext* atomic_context = context_instance.load(
+ std::memory_order_relaxed);
+ std::atomic_thread_fence(std::memory_order_acquire);
+ if (atomic_context == nullptr) {
+ std::lock_guard<std::mutex> lock(context_mutex);
+ atomic_context = context_instance.load(std::memory_order_relaxed);
+ if (atomic_context == nullptr) {
+ atomic_context = new TLSContext();
+ atomic_context->initialize();
+ std::atomic_thread_fence(std::memory_order_release);
+ context_instance.store(atomic_context, std::memory_order_relaxed);
+ }
+ }
+ return atomic_context;
+ }
+
+ virtual ~TLSContext() {
+ if (0 != ctx)
+ SSL_CTX_free(ctx);
+ }
+
+ SSL_CTX *getContext() {
+ return ctx;
+ }
+
+ short getError() {
+ return error_value;
+ }
+
+ short initialize();
+
+ private:
+
+ static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) {
+ std::string passphrase;
+
+ if (Configure::getConfigure()->get(
+ Configure::nifi_security_client_pass_phrase, passphrase)) {
+
+ std::ifstream file(passphrase.c_str(), std::ifstream::in);
+ if (!file.good()) {
+ memset(buf, 0x00, size);
+ return 0;
+ }
+
+ std::string password;
+ password.assign((std::istreambuf_iterator<char>(file)),
+ std::istreambuf_iterator<char>());
+ file.close();
+ memset(buf, 0x00, size);
+ memcpy(buf, password.c_str(), password.length() - 1);
+
+ return password.length() - 1;
+ }
+ return 0;
+ }
+
+ TLSContext();
+
+ std::shared_ptr<logging::Logger> logger_;
+ Configure *configuration;
+ SSL_CTX *ctx;
+
+ short error_value;
+
+ static std::atomic<TLSContext*> context_instance;
+ static std::mutex context_mutex;
+
+};
+
+class TLSSocket : public Socket {
+ public:
+
+ /**
+ * Constructor that accepts host name, port and listeners. With this
+ * contructor we will be creating a server socket
+ * @param hostname our host name
+ * @param port connecting port
+ * @param listeners number of listeners in the queue
+ */
+ explicit TLSSocket(const std::string &hostname, const uint16_t port,
+ const uint16_t listeners);
+
+ /**
+ * Constructor that creates a client socket.
+ * @param hostname hostname we are connecting to.
+ * @param port port we are connecting to.
+ */
+ explicit TLSSocket(const std::string &hostname, const uint16_t port);
+
+ /**
+ * Move constructor.
+ */
+ explicit TLSSocket(const TLSSocket &&);
+
+ virtual ~TLSSocket();
+
+ /**
+ * Initializes the socket
+ * @return result of the creation operation.
+ */
+ short initialize();
+
+ /**
+ * Attempt to select the socket file descriptor
+ * @param msec timeout interval to wait
+ * @returns file descriptor
+ */
+ virtual short select_descriptor(const uint16_t msec);
+
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(uint8_t *buf, int buflen);
+
+ /**
+ * Write value to the stream using std::vector
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ int writeData(std::vector<uint8_t> &buf, int buflen);
+
+ /**
+ * Write value to the stream using uint8_t ptr
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ int writeData(uint8_t *value, int size);
+
+ protected:
+
+ SSL* ssl;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/validation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h
index e1b4bb6..c66c412 100644
--- a/libminifi/include/io/validation.h
+++ b/libminifi/include/io/validation.h
@@ -18,35 +18,38 @@
#ifndef VALIDATION_H
#define VALIDATION_H
+#include <type_traits>
#include <string>
#include <cstring>
+
/**
* A checker that will, at compile time, tell us
* if the declared type has a size method.
*/
template<typename T>
class size_function_functor_checker {
- typedef char hasit;
- typedef long doesnothaveit;
+ typedef char hasit;
+ typedef long doesnothaveit;
- // look for the declared type
- template<typename O> static hasit test(decltype(&O::size));
- template<typename O> static doesnothaveit test(...);
+ // look for the declared type
+ template<typename O> static hasit test(decltype(&O::size));
+ template<typename O> static doesnothaveit test(...);
-public:
- enum {
- has_size_function = sizeof(test<T>(0)) == sizeof(char)
- };
+ public:
+ enum {
+ has_size_function = sizeof(test<T>(0)) == sizeof(char)
+ };
};
+
/**
* Determines if the variable is null or ::size() == 0
*/
template<typename T>
static auto IsNullOrEmpty(
T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
- return object.size() == 0;
+ return object.size() == 0;
}
/**
@@ -55,7 +58,7 @@ static auto IsNullOrEmpty(
template<typename T>
static auto IsNullOrEmpty(
T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
- return (0 == object || object->size() == 0);
+ return (0 == object || object->size() == 0);
}
/**
@@ -64,13 +67,13 @@ static auto IsNullOrEmpty(
template<typename T>
static auto IsNullOrEmpty(
T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
- return (0 == object);
+ return (0 == object);
}
/**
* Determines if the variable is null or strlen(str) == 0
*/
static auto IsNullOrEmpty(char *str)-> decltype(NULL !=str, bool()) {
- return (NULL == str || strlen(str) == 0);
+ return (NULL == str || strlen(str) == 0);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/AppendHostInfo.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h
new file mode 100644
index 0000000..6515918
--- /dev/null
+++ b/libminifi/include/processors/AppendHostInfo.h
@@ -0,0 +1,80 @@
+/**
+ * @file AppendHostInfo.h
+ * AppendHostInfo class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __APPEND_HOSTINFO_H__
+#define __APPEND_HOSTINFO_H__
+
+#include "core/Property.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AppendHostInfo Class
+class AppendHostInfo : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ AppendHostInfo(std::string name, uuid_t uuid = NULL)
+ : core::Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ }
+ // Destructor
+ virtual ~AppendHostInfo() {
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property InterfaceName;
+ static core::Property HostAttribute;
+ static core::Property IPAttribute;
+
+ // Supported Relationships
+ static core::Relationship Success;
+
+ public:
+ // OnTrigger method, implemented by NiFi AppendHostInfo
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi AppendHostInfo
+ virtual void initialize(void);
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h
new file mode 100644
index 0000000..123eed3
--- /dev/null
+++ b/libminifi/include/processors/ExecuteProcess.h
@@ -0,0 +1,125 @@
+/**
+ * @file ExecuteProcess.h
+ * ExecuteProcess class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXECUTE_PROCESS_H__
+#define __EXECUTE_PROCESS_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <iostream>
+#include <sys/types.h>
+#include <signal.h>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// ExecuteProcess Class
+class ExecuteProcess : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ ExecuteProcess(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ _redirectErrorStream = false;
+ _batchDuration = 0;
+ _workingDir = ".";
+ _processRunning = false;
+ _pid = 0;
+ }
+ // Destructor
+ virtual ~ExecuteProcess() {
+ if (_processRunning && _pid > 0)
+ kill(_pid, SIGTERM);
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property Command;
+ static core::Property CommandArguments;
+ static core::Property WorkingDir;
+ static core::Property BatchDuration;
+ static core::Property RedirectErrorStream;
+ // Supported Relationships
+ static core::Relationship Success;
+
+ // Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data),
+ _dataSize(size) {
+ }
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+ public:
+ // OnTrigger method, implemented by NiFi ExecuteProcess
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi ExecuteProcess
+ virtual void initialize(void);
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Property
+ std::string _command;
+ std::string _commandArgument;
+ std::string _workingDir;
+ int64_t _batchDuration;
+ bool _redirectErrorStream;
+ // Full command
+ std::string _fullCommand;
+ // whether the process is running
+ bool _processRunning;
+ int _pipefd[2];
+ pid_t _pid;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/GenerateFlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h
new file mode 100644
index 0000000..c4ab6fe
--- /dev/null
+++ b/libminifi/include/processors/GenerateFlowFile.h
@@ -0,0 +1,98 @@
+/**
+ * @file GenerateFlowFile.h
+ * GenerateFlowFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GENERATE_FLOW_FILE_H__
+#define __GENERATE_FLOW_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+// GenerateFlowFile Class
+class GenerateFlowFile : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ GenerateFlowFile(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid) {
+ _data = NULL;
+ _dataSize = 0;
+ }
+ // Destructor
+ virtual ~GenerateFlowFile() {
+ if (_data)
+ delete[] _data;
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property FileSize;
+ static core::Property BatchSize;
+ static core::Property DataFormat;
+ static core::Property UniqueFlowFiles;
+ static const char *DATA_FORMAT_BINARY;
+ static const char *DATA_FORMAT_TEXT;
+ // Supported Relationships
+ static core::Relationship Success;
+ // Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data),
+ _dataSize(size) {
+ }
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+ public:
+ // OnTrigger method, implemented by NiFi GenerateFlowFile
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi GenerateFlowFile
+ virtual void initialize(void);
+
+ protected:
+
+ private:
+ // Generated data
+ char * _data;
+ // Size of the generate data
+ uint64_t _dataSize;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h
new file mode 100644
index 0000000..f1f0694
--- /dev/null
+++ b/libminifi/include/processors/GetFile.h
@@ -0,0 +1,129 @@
+/**
+ * @file GetFile.h
+ * GetFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GET_FILE_H__
+#define __GET_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// GetFile Class
+class GetFile : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ GetFile(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ _directory = ".";
+ _recursive = true;
+ _keepSourceFile = false;
+ _minAge = 0;
+ _maxAge = 0;
+ _minSize = 0;
+ _maxSize = 0;
+ _ignoreHiddenFile = true;
+ _pollInterval = 0;
+ _batchSize = 10;
+ _lastDirectoryListingTime = getTimeMillis();
+ _fileFilter = "[^\\.].*";
+ }
+ // Destructor
+ virtual ~GetFile() {
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property Directory;
+ static core::Property Recurse;
+ static core::Property KeepSourceFile;
+ static core::Property MinAge;
+ static core::Property MaxAge;
+ static core::Property MinSize;
+ static core::Property MaxSize;
+ static core::Property IgnoreHiddenFile;
+ static core::Property PollInterval;
+ static core::Property BatchSize;
+ static core::Property FileFilter;
+ // Supported Relationships
+ static core::Relationship Success;
+
+ public:
+ // OnTrigger method, implemented by NiFi GetFile
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi GetFile
+ virtual void initialize(void);
+ // perform directory listing
+ void performListing(std::string dir);
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Queue for store directory list
+ std::queue<std::string> _dirList;
+ // Get Listing size
+ uint64_t getListingSize() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return _dirList.size();
+ }
+ // Whether the directory listing is empty
+ bool isListingEmpty();
+ // Put full path file name into directory listing
+ void putListing(std::string fileName);
+ // Poll directory listing for files
+ void pollListing(std::queue<std::string> &list, int maxSize);
+ // Check whether file can be added to the directory listing
+ bool acceptFile(std::string fullName, std::string name);
+ // Mutex for protection of the directory listing
+ std::mutex mutex_;
+ std::string _directory;
+ bool _recursive;
+ bool _keepSourceFile;
+ int64_t _minAge;
+ int64_t _maxAge;
+ int64_t _minSize;
+ int64_t _maxSize;
+ bool _ignoreHiddenFile;
+ int64_t _pollInterval;
+ int64_t _batchSize;
+ uint64_t _lastDirectoryListingTime;
+ std::string _fileFilter;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h
new file mode 100644
index 0000000..adaefb1
--- /dev/null
+++ b/libminifi/include/processors/ListenHTTP.h
@@ -0,0 +1,126 @@
+/**
+ * @file ListenHTTP.h
+ * ListenHTTP class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_HTTP_H__
+#define __LISTEN_HTTP_H__
+
+#include <memory>
+#include <regex>
+
+#include <CivetServer.h>
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// ListenHTTP Class
+class ListenHTTP : public core::Processor {
+ public:
+
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ ListenHTTP(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid) {
+ _logger = logging::Logger::getLogger();
+ }
+ // Destructor
+ virtual ~ListenHTTP();
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property BasePath;
+ static core::Property Port;
+ static core::Property AuthorizedDNPattern;
+ static core::Property SSLCertificate;
+ static core::Property SSLCertificateAuthority;
+ static core::Property SSLVerifyPeer;
+ static core::Property SSLMinimumVersion;
+ static core::Property HeadersAsAttributesRegex;
+ // Supported Relationships
+ static core::Relationship Success;
+
+ void onTrigger(core::ProcessContext *context,
+ core::ProcessSession *session);
+ void initialize();
+ void onSchedule(
+ core::ProcessContext *context,
+ core::ProcessSessionFactory *sessionFactory);
+
+ // HTTP request handler
+ class Handler : public CivetHandler {
+ public:
+ Handler(
+ core::ProcessContext *context,
+ core::ProcessSessionFactory *sessionFactory,
+ std::string &&authDNPattern, std::string &&headersAsAttributesPattern);
+ bool handlePost(CivetServer *server, struct mg_connection *conn);
+
+ private:
+ // Send HTTP 500 error response to client
+ void sendErrorResponse(struct mg_connection *conn);
+ // Logger
+ std::shared_ptr<logging::Logger> _logger;
+
+ std::regex _authDNRegex;
+ std::regex _headersAsAttributesRegex;
+ core::ProcessContext *_processContext;
+ core::ProcessSessionFactory *_processSessionFactory;
+ };
+
+ // Write callback for transferring data from HTTP request to content repo
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(struct mg_connection *conn,
+ const struct mg_request_info *reqInfo);
+ void process(std::ofstream *stream);
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> _logger;
+
+ struct mg_connection *_conn;
+ const struct mg_request_info *_reqInfo;
+ };
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> _logger;
+
+ std::unique_ptr<CivetServer> _server;
+ std::unique_ptr<Handler> _handler;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
new file mode 100644
index 0000000..1e1e11f
--- /dev/null
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -0,0 +1,216 @@
+/**
+ * @file ListenSyslog.h
+ * ListenSyslog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_SYSLOG_H__
+#define __LISTEN_SYSLOG_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <chrono>
+#include <thread>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// SyslogEvent
+typedef struct {
+ uint8_t *payload;
+ uint64_t len;
+} SysLogEvent;
+
+// ListenSyslog Class
+class ListenSyslog : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ ListenSyslog(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ _eventQueueByteSize = 0;
+ _serverSocket = 0;
+ _recvBufSize = 65507;
+ _maxSocketBufSize = 1024 * 1024;
+ _maxConnections = 2;
+ _maxBatchSize = 1;
+ _messageDelimiter = "\n";
+ _protocol = "UDP";
+ _port = 514;
+ _parseMessages = false;
+ _serverSocket = 0;
+ _maxFds = 0;
+ FD_ZERO(&_readfds);
+ _thread = NULL;
+ _resetServerSocket = false;
+ _serverTheadRunning = false;
+ }
+ // Destructor
+ virtual ~ListenSyslog() {
+ _serverTheadRunning = false;
+ if (this->_thread)
+ delete this->_thread;
+ // need to reset the socket
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
+ int clientSocket = *it;
+ close(clientSocket);
+ }
+ _clientSockets.clear();
+ if (_serverSocket > 0) {
+ logger_->log_info("ListenSysLog Server socket %d close", _serverSocket);
+ close(_serverSocket);
+ _serverSocket = 0;
+ }
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property RecvBufSize;
+ static core::Property MaxSocketBufSize;
+ static core::Property MaxConnections;
+ static core::Property MaxBatchSize;
+ static core::Property MessageDelimiter;
+ static core::Property ParseMessages;
+ static core::Property Protocol;
+ static core::Property Port;
+ // Supported Relationships
+ static core::Relationship Success;
+ static core::Relationship Invalid;
+ // Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data),
+ _dataSize(size) {
+ }
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+ public:
+ // OnTrigger method, implemented by NiFi ListenSyslog
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi ListenSyslog
+ virtual void initialize(void);
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Run function for the thread
+ static void run(ListenSyslog *process);
+ // Run Thread
+ void runThread();
+ // Queue for store syslog event
+ std::queue<SysLogEvent> _eventQueue;
+ // Size of Event queue in bytes
+ uint64_t _eventQueueByteSize;
+ // Get event queue size
+ uint64_t getEventQueueSize() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return _eventQueue.size();
+ }
+ // Get event queue byte size
+ uint64_t getEventQueueByteSize() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return _eventQueueByteSize;
+ }
+ // Whether the event queue is empty
+ bool isEventQueueEmpty() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return _eventQueue.empty();
+ }
+ // Put event into directory listing
+ void putEvent(uint8_t *payload, uint64_t len) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ SysLogEvent event;
+ event.payload = payload;
+ event.len = len;
+ _eventQueue.push(event);
+ _eventQueueByteSize += len;
+ }
+ // Read \n terminated line from TCP socket
+ int readline(int fd, char *bufptr, size_t len);
+ // start server socket and handling client socket
+ void startSocketThread();
+ // Poll event
+ void pollEvent(std::queue<SysLogEvent> &list, int maxSize) {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) {
+ SysLogEvent event = _eventQueue.front();
+ _eventQueue.pop();
+ _eventQueueByteSize -= event.len;
+ list.push(event);
+ }
+ return;
+ }
+ // Mutex for protection of the directory listing
+ std::mutex mutex_;
+ int64_t _recvBufSize;
+ int64_t _maxSocketBufSize;
+ int64_t _maxConnections;
+ int64_t _maxBatchSize;
+ std::string _messageDelimiter;
+ std::string _protocol;
+ int64_t _port;
+ bool _parseMessages;
+ int _serverSocket;
+ std::vector<int> _clientSockets;
+ int _maxFds;
+ fd_set _readfds;
+ // thread
+ std::thread *_thread;
+ // whether to reset the server socket
+ bool _resetServerSocket;
+ bool _serverTheadRunning;
+ // buffer for read socket
+ uint8_t _buffer[2048];
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h
new file mode 100644
index 0000000..37c0ec3
--- /dev/null
+++ b/libminifi/include/processors/LogAttribute.h
@@ -0,0 +1,130 @@
+/**
+ * @file LogAttribute.h
+ * LogAttribute class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOG_ATTRIBUTE_H__
+#define __LOG_ATTRIBUTE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// LogAttribute Class
+class LogAttribute : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ LogAttribute(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ }
+ // Destructor
+ virtual ~LogAttribute() {
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property LogLevel;
+ static core::Property AttributesToLog;
+ static core::Property AttributesToIgnore;
+ static core::Property LogPayload;
+ static core::Property LogPrefix;
+ // Supported Relationships
+ static core::Relationship Success;
+ enum LogAttrLevel {
+ LogAttrLevelTrace,
+ LogAttrLevelDebug,
+ LogAttrLevelInfo,
+ LogAttrLevelWarn,
+ LogAttrLevelError
+ };
+ // Convert log level from string to enum
+ bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) {
+ if (logStr == "trace") {
+ level = LogAttrLevelTrace;
+ return true;
+ } else if (logStr == "debug") {
+ level = LogAttrLevelDebug;
+ return true;
+ } else if (logStr == "info") {
+ level = LogAttrLevelInfo;
+ return true;
+ } else if (logStr == "warn") {
+ level = LogAttrLevelWarn;
+ return true;
+ } else if (logStr == "error") {
+ level = LogAttrLevelError;
+ return true;
+ } else
+ return false;
+ }
+ // Nest Callback Class for read stream
+ class ReadCallback : public InputStreamCallback {
+ public:
+ ReadCallback(uint64_t size) {
+ _bufferSize = size;
+ _buffer = new char[_bufferSize];
+ }
+ ~ReadCallback() {
+ if (_buffer)
+ delete[] _buffer;
+ }
+ void process(std::ifstream *stream) {
+
+ stream->read(_buffer, _bufferSize);
+ if (!stream)
+ _readSize = stream->gcount();
+ else
+ _readSize = _bufferSize;
+ }
+ char *_buffer;
+ uint64_t _bufferSize;
+ uint64_t _readSize;
+ };
+
+ public:
+ // OnTrigger method, implemented by NiFi LogAttribute
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi LogAttribute
+ virtual void initialize(void);
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
new file mode 100644
index 0000000..7653fac
--- /dev/null
+++ b/libminifi/include/processors/PutFile.h
@@ -0,0 +1,101 @@
+/**
+ * @file PutFile.h
+ * PutFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_FILE_H__
+#define __PUT_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PutFile Class
+class PutFile : public core::Processor {
+ public:
+
+ static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
+ static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
+ static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;
+
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ PutFile(std::string name, uuid_t uuid = NULL)
+ : core::Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ }
+ // Destructor
+ virtual ~PutFile() {
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property Directory;
+ static core::Property ConflictResolution;
+ // Supported Relationships
+ static core::Relationship Success;
+ static core::Relationship Failure;
+
+ // OnTrigger method, implemented by NiFi PutFile
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi PutFile
+ virtual void initialize(void);
+
+ class ReadCallback : public InputStreamCallback {
+ public:
+ ReadCallback(const std::string &tmpFile, const std::string &destFile);
+ ~ReadCallback();
+ virtual void process(std::ifstream *stream);
+ bool commit();
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ std::ofstream _tmpFileOs;
+ bool _writeSucceeded = false;
+ std::string _tmpFile;
+ std::string _destFile;
+ };
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+ bool putFile(core::ProcessSession *session,
+ std::shared_ptr<FlowFileRecord> flowFile,
+ const std::string &tmpFile, const std::string &destFile);
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/RealTimeDataCollector.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/RealTimeDataCollector.h b/libminifi/include/processors/RealTimeDataCollector.h
new file mode 100644
index 0000000..41bd814
--- /dev/null
+++ b/libminifi/include/processors/RealTimeDataCollector.h
@@ -0,0 +1,145 @@
+/**
+ * @file RealTimeDataCollector.h
+ * RealTimeDataCollector class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REAL_TIME_DATA_COLLECTOR_H__
+#define __REAL_TIME_DATA_COLLECTOR_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// RealTimeDataCollector Class
+class RealTimeDataCollector : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
+ : core::Processor(name, uuid) {
+ _realTimeSocket = 0;
+ _batchSocket = 0;
+ logger_ = logging::Logger::getLogger();
+ _firstInvoking = false;
+ _realTimeAccumulated = 0;
+ _batchAcccumulated = 0;
+ _queuedDataSize = 0;
+ }
+ // Destructor
+ virtual ~RealTimeDataCollector() {
+ if (_realTimeSocket)
+ close(_realTimeSocket);
+ if (_batchSocket)
+ close(_batchSocket);
+ if (_fileStream.is_open())
+ _fileStream.close();
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property REALTIMESERVERNAME;
+ static core::Property REALTIMESERVERPORT;
+ static core::Property BATCHSERVERNAME;
+ static core::Property BATCHSERVERPORT;
+ static core::Property FILENAME;
+ static core::Property ITERATION;
+ static core::Property REALTIMEMSGID;
+ static core::Property BATCHMSGID;
+ static core::Property REALTIMEINTERVAL;
+ static core::Property BATCHINTERVAL;
+ static core::Property BATCHMAXBUFFERSIZE;
+ // Supported Relationships
+ static core::Relationship Success;
+ // Connect to the socket
+ int connectServer(const char *host, uint16_t port);
+ int sendData(int socket, const char *buf, int buflen);
+ void onTriggerRealTime(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ void onTriggerBatch(core::ProcessContext *context,
+ core::ProcessSession *session);
+
+ public:
+ // OnTrigger method, implemented by NiFi RealTimeDataCollector
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi RealTimeDataCollector
+ virtual void initialize(void);
+
+ protected:
+
+ private:
+ // realtime server Name
+ std::string _realTimeServerName;
+ int64_t _realTimeServerPort;
+ std::string _batchServerName;
+ int64_t _batchServerPort;
+ int64_t _realTimeInterval;
+ int64_t _batchInterval;
+ int64_t _batchMaxBufferSize;
+ // Match pattern for Real time Message ID
+ std::vector<std::string> _realTimeMsgID;
+ // Match pattern for Batch Message ID
+ std::vector<std::string> _batchMsgID;
+ // file for which the realTime collector will tail
+ std::string _fileName;
+ // Whether we need to iterate from the beginning for demo
+ bool _iteration;
+ int _realTimeSocket;
+ int _batchSocket;
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Mutex for protection
+ std::mutex mutex_;
+ // Queued data size
+ uint64_t _queuedDataSize;
+ // Queue for the batch process
+ std::queue<std::string> _queue;
+ std::thread::id _realTimeThreadId;
+ std::thread::id _batchThreadId;
+ std::atomic<bool> _firstInvoking;
+ int64_t _realTimeAccumulated;
+ int64_t _batchAcccumulated;
+ std::ifstream _fileStream;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
new file mode 100644
index 0000000..5be76e4
--- /dev/null
+++ b/libminifi/include/processors/TailFile.h
@@ -0,0 +1,105 @@
+/**
+ * @file TailFile.h
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TAIL_FILE_H__
+#define __TAIL_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// TailFile Class
+class TailFile : public core::Processor {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ TailFile(std::string name, uuid_t uuid = NULL)
+ : core::Processor(name, uuid) {
+ logger_ = logging::Logger::getLogger();
+ _stateRecovered = false;
+ }
+ // Destructor
+ virtual ~TailFile() {
+ storeState();
+ }
+ // Processor Name
+ static const std::string ProcessorName;
+ // Supported Properties
+ static core::Property FileName;
+ static core::Property StateFile;
+ // Supported Relationships
+ static core::Relationship Success;
+
+ public:
+ // OnTrigger method, implemented by NiFi TailFile
+ virtual void onTrigger(
+ core::ProcessContext *context,
+ core::ProcessSession *session);
+ // Initialize, over write by NiFi TailFile
+ virtual void initialize(void);
+ // recoverState
+ void recoverState();
+ // storeState
+ void storeState();
+
+ protected:
+
+ private:
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ std::string _fileLocation;
+ // Property Specified Tailed File Name
+ std::string _fileName;
+ // File to save state
+ std::string _stateFile;
+ // State related to the tailed file
+ std::string _currentTailFileName;
+ uint64_t _currentTailFilePosition;
+ bool _stateRecovered;
+ uint64_t _currentTailFileCreatedTime;
+ // Utils functions for parse state file
+ std::string trimLeft(const std::string& s);
+ std::string trimRight(const std::string& s);
+ void parseStateFileLine(char *buf);
+ void checkRollOver();
+
+};
+
+// Matched File Item for Roll over check
+typedef struct {
+ std::string fileName;
+ uint64_t modifiedTime;
+} TailMatchedFileItem;
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
new file mode 100644
index 0000000..c0d9bd4
--- /dev/null
+++ b/libminifi/include/properties/Configure.h
@@ -0,0 +1,131 @@
+/**
+ * @file Configure.h
+ * Configure class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONFIGURE_H__
+#define __CONFIGURE_H__
+
+#include <stdio.h>
+#include <string>
+#include <map>
+#include <stdlib.h>
+#include <errno.h>
+#include <iostream>
+#include <fstream>
+#include "core/core.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+class Configure {
+ public:
+ // Get the singleton logger instance
+ static Configure * getConfigure() {
+ if (!configure_) {
+ configure_ = new Configure();
+ }
+ return configure_;
+ }
+ // nifi.flow.configuration.file
+ static const char *nifi_flow_configuration_file;
+ static const char *nifi_administrative_yield_duration;
+ static const char *nifi_bored_yield_duration;
+ static const char *nifi_graceful_shutdown_seconds;
+ static const char *nifi_log_level;
+ static const char *nifi_server_name;
+ static const char *nifi_configuration_class_name;
+ static const char *nifi_flow_repository_class_name;
+ static const char *nifi_provenance_repository_class_name;
+ static const char *nifi_server_port;
+ static const char *nifi_server_report_interval;
+ static const char *nifi_provenance_repository_max_storage_time;
+ static const char *nifi_provenance_repository_max_storage_size;
+ static const char *nifi_provenance_repository_directory_default;
+ static const char *nifi_provenance_repository_enable;
+ static const char *nifi_flowfile_repository_max_storage_time;
+ static const char *nifi_flowfile_repository_max_storage_size;
+ static const char *nifi_flowfile_repository_directory_default;
+ static const char *nifi_flowfile_repository_enable;
+ static const char *nifi_remote_input_secure;
+ static const char *nifi_security_need_ClientAuth;
+ static const char *nifi_security_client_certificate;
+ static const char *nifi_security_client_private_key;
+ static const char *nifi_security_client_pass_phrase;
+ static const char *nifi_security_client_ca_certificate;
+
+ // Clear the load config
+ void clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ properties_.clear();
+ }
+ // Set the config value
+ void set(std::string key, std::string value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ properties_[key] = value;
+ }
+ // Check whether the config value existed
+ bool has(std::string key) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return (properties_.find(key) != properties_.end());
+ }
+ // Get the config value
+ bool get(std::string key, std::string &value);
+ // Parse one line in configure file like key=value
+ void parseConfigureFileLine(char *buf);
+ // Load Configure File
+ void loadConfigureFile(const char *fileName);
+ // Set the determined MINIFI_HOME
+ void setHome(std::string minifiHome) {
+ minifi_home_ = minifiHome;
+ }
+
+ // Get the determined MINIFI_HOME
+ std::string getHome() {
+ return minifi_home_;
+ }
+ // Parse Command Line
+ void parseCommandLine(int argc, char **argv);
+
+ private:
+ // Mutex for protection
+ std::mutex mutex_;
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Home location for this executable
+ std::string minifi_home_;
+
+ Configure() {
+ logger_ = logging::Logger::getLogger();
+ }
+ virtual ~Configure() {
+
+ }
+ static Configure *configure_;
+
+ protected:
+ std::map<std::string, std::string> properties_;
+};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif