You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/10/13 15:07:27 UTC
[09/18] nifi-minifi-cpp git commit: MINIFI-34 Establishing CMake
build system to provide build functionality equivalent to pre-existing
Makefile.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/null_sink.h
----------------------------------------------------------------------
diff --git a/include/spdlog/sinks/null_sink.h b/include/spdlog/sinks/null_sink.h
new file mode 100644
index 0000000..992b3b7
--- /dev/null
+++ b/include/spdlog/sinks/null_sink.h
@@ -0,0 +1,52 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+#pragma once
+#include <mutex>
+#include "./base_sink.h"
+#include "../details/null_mutex.h"
+
+
+namespace spdlog
+{
+namespace sinks
+{
+
+template <class Mutex>
+class null_sink : public base_sink < Mutex >
+{
+protected:
+ void _sink_it(const details::log_msg&) override
+ {}
+
+ void flush() override
+ {}
+
+};
+typedef null_sink<details::null_mutex> null_sink_st;
+typedef null_sink<std::mutex> null_sink_mt;
+
+}
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/ostream_sink.h
----------------------------------------------------------------------
diff --git a/include/spdlog/sinks/ostream_sink.h b/include/spdlog/sinks/ostream_sink.h
new file mode 100644
index 0000000..f2fe3b2
--- /dev/null
+++ b/include/spdlog/sinks/ostream_sink.h
@@ -0,0 +1,67 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+#pragma once
+
+#include <ostream>
+#include <mutex>
+#include <memory>
+
+#include "../details/null_mutex.h"
+#include "./base_sink.h"
+
+namespace spdlog
+{
+namespace sinks
+{
+template<class Mutex>
+class ostream_sink: public base_sink<Mutex>
+{
+public:
+ explicit ostream_sink(std::ostream& os, bool force_flush=false) :_ostream(os), _force_flush(force_flush) {}
+ ostream_sink(const ostream_sink&) = delete;
+ ostream_sink& operator=(const ostream_sink&) = delete;
+ virtual ~ostream_sink() = default;
+
+protected:
+ void _sink_it(const details::log_msg& msg) override
+ {
+ _ostream.write(msg.formatted.data(), msg.formatted.size());
+ if (_force_flush)
+ _ostream.flush();
+ }
+
+ void flush() override
+ {
+ _ostream.flush();
+ }
+
+ std::ostream& _ostream;
+ bool _force_flush;
+};
+
+typedef ostream_sink<std::mutex> ostream_sink_mt;
+typedef ostream_sink<details::null_mutex> ostream_sink_st;
+}
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/sink.h
----------------------------------------------------------------------
diff --git a/include/spdlog/sinks/sink.h b/include/spdlog/sinks/sink.h
new file mode 100644
index 0000000..88c423a
--- /dev/null
+++ b/include/spdlog/sinks/sink.h
@@ -0,0 +1,42 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+#pragma once
+
+#include "../details/log_msg.h"
+
+namespace spdlog
+{
+namespace sinks
+{
+class sink
+{
+public:
+ virtual ~sink() {}
+ virtual void log(const details::log_msg& msg) = 0;
+ virtual void flush() = 0;
+};
+}
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/stdout_sinks.h
----------------------------------------------------------------------
diff --git a/include/spdlog/sinks/stdout_sinks.h b/include/spdlog/sinks/stdout_sinks.h
new file mode 100644
index 0000000..4ca16ac
--- /dev/null
+++ b/include/spdlog/sinks/stdout_sinks.h
@@ -0,0 +1,71 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+#pragma once
+
+#include <iostream>
+#include <mutex>
+#include "./ostream_sink.h"
+#include "spdlog/details/null_mutex.h"
+
+namespace spdlog
+{
+namespace sinks
+{
+
+template <class Mutex>
+class stdout_sink : public ostream_sink<Mutex>
+{
+ using MyType = stdout_sink<Mutex>;
+public:
+ stdout_sink() : ostream_sink<Mutex>(std::cout, true) {}
+ static std::shared_ptr<MyType> instance()
+ {
+ static std::shared_ptr<MyType> instance = std::make_shared<MyType>();
+ return instance;
+ }
+};
+
+typedef stdout_sink<details::null_mutex> stdout_sink_st;
+typedef stdout_sink<std::mutex> stdout_sink_mt;
+
+
+template <class Mutex>
+class stderr_sink : public ostream_sink<Mutex>
+{
+ using MyType = stderr_sink<Mutex>;
+public:
+ stderr_sink() : ostream_sink<Mutex>(std::cerr, true) {}
+ static std::shared_ptr<MyType> instance()
+ {
+ static std::shared_ptr<MyType> instance = std::make_shared<MyType>();
+ return instance;
+ }
+
+};
+
+typedef stderr_sink<std::mutex> stderr_sink_mt;
+typedef stderr_sink<details::null_mutex> stderr_sink_st;
+}
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/sinks/syslog_sink.h
----------------------------------------------------------------------
diff --git a/include/spdlog/sinks/syslog_sink.h b/include/spdlog/sinks/syslog_sink.h
new file mode 100644
index 0000000..37b6513
--- /dev/null
+++ b/include/spdlog/sinks/syslog_sink.h
@@ -0,0 +1,102 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+#pragma once
+
+#ifdef __linux__
+
+#include <array>
+#include <string>
+#include <syslog.h>
+
+#include "./sink.h"
+#include "../common.h"
+#include "../details/log_msg.h"
+
+
+namespace spdlog
+{
+namespace sinks
+{
+/**
+ * Sink that write to syslog using the `syscall()` library call.
+ *
+ * Locking is not needed, as `syslog()` itself is thread-safe.
+ */
+class syslog_sink : public sink
+{
+public:
+ //
+ syslog_sink(const std::string& ident = "", int syslog_option=0, int syslog_facility=LOG_USER):
+ _ident(ident)
+ {
+ _priorities[static_cast<int>(level::trace)] = LOG_DEBUG;
+ _priorities[static_cast<int>(level::debug)] = LOG_DEBUG;
+ _priorities[static_cast<int>(level::info)] = LOG_INFO;
+ _priorities[static_cast<int>(level::notice)] = LOG_NOTICE;
+ _priorities[static_cast<int>(level::warn)] = LOG_WARNING;
+ _priorities[static_cast<int>(level::err)] = LOG_ERR;
+ _priorities[static_cast<int>(level::critical)] = LOG_CRIT;
+ _priorities[static_cast<int>(level::alert)] = LOG_ALERT;
+ _priorities[static_cast<int>(level::emerg)] = LOG_EMERG;
+ _priorities[static_cast<int>(level::off)] = LOG_INFO;
+
+ //set ident to be program name if empty
+ ::openlog(_ident.empty()? nullptr:_ident.c_str(), syslog_option, syslog_facility);
+ }
+ ~syslog_sink()
+ {
+ ::closelog();
+ }
+
+ syslog_sink(const syslog_sink&) = delete;
+ syslog_sink& operator=(const syslog_sink&) = delete;
+
+ void log(const details::log_msg &msg) override
+ {
+ ::syslog(syslog_prio_from_level(msg), "%s", msg.formatted.str().c_str());
+ }
+
+ void flush() override
+ {
+ }
+
+
+private:
+ std::array<int, 10> _priorities;
+ //must store the ident because the man says openlog might use the pointer as is and not a string copy
+ const std::string _ident;
+
+ //
+ // Simply maps spdlog's log level to syslog priority level.
+ //
+ int syslog_prio_from_level(const details::log_msg &msg) const
+ {
+ return _priorities[static_cast<int>(msg.level)];
+ }
+};
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/spdlog.h
----------------------------------------------------------------------
diff --git a/include/spdlog/spdlog.h b/include/spdlog/spdlog.h
new file mode 100644
index 0000000..5cec562
--- /dev/null
+++ b/include/spdlog/spdlog.h
@@ -0,0 +1,155 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+
+// spdlog main header file.
+//see example.cpp for usage example
+
+#pragma once
+
+#include "tweakme.h"
+#include "common.h"
+#include "logger.h"
+
+namespace spdlog
+{
+// Return an existing logger or nullptr if a logger with such name doesn't exist.
+// Examples:
+//
+// spdlog::get("mylog")->info("Hello");
+// auto logger = spdlog::get("mylog");
+// logger.info("This is another message" , x, y, z);
+// logger.info() << "This is another message" << x << y << z;
+std::shared_ptr<logger> get(const std::string& name);
+
+//
+// Set global formatting
+// example: spdlog::set_pattern("%Y-%m-%d %H:%M:%S.%e %l : %v");
+//
+void set_pattern(const std::string& format_string);
+void set_formatter(formatter_ptr f);
+
+//
+// Set global logging level for
+//
+void set_level(level::level_enum log_level);
+
+//
+// Turn on async mode (off by default) and set the queue size for each async_logger.
+// effective only for loggers created after this call.
+// queue_size: size of queue (must be power of 2):
+// Each logger will pre-allocate a dedicated queue with queue_size entries upon construction.
+//
+// async_overflow_policy (optional, block_retry by default):
+// async_overflow_policy::block_retry - if queue is full, block until queue has room for the new log entry.
+// async_overflow_policy::discard_log_msg - never block and discard any new messages when queue overflows.
+//
+// worker_warmup_cb (optional):
+// callback function that will be called in worker thread upon start (can be used to init stuff like thread affinity)
+//
+void set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const std::function<void()>& worker_warmup_cb = nullptr, const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
+
+// Turn off async mode
+void set_sync_mode();
+
+//
+// Create and register multi/single threaded rotating file logger
+//
+std::shared_ptr<logger> rotating_logger_mt(const std::string& logger_name, const std::string& filename, size_t max_file_size, size_t max_files, bool force_flush = false);
+std::shared_ptr<logger> rotating_logger_st(const std::string& logger_name, const std::string& filename, size_t max_file_size, size_t max_files, bool force_flush = false);
+
+//
+// Create file logger which creates new file on the given time (default in midnight):
+//
+std::shared_ptr<logger> daily_logger_mt(const std::string& logger_name, const std::string& filename, int hour=0, int minute=0, bool force_flush = false);
+std::shared_ptr<logger> daily_logger_st(const std::string& logger_name, const std::string& filename, int hour=0, int minute=0, bool force_flush = false);
+
+
+//
+// Create and register stdout/stderr loggers
+//
+std::shared_ptr<logger> stdout_logger_mt(const std::string& logger_name);
+std::shared_ptr<logger> stdout_logger_st(const std::string& logger_name);
+std::shared_ptr<logger> stderr_logger_mt(const std::string& logger_name);
+std::shared_ptr<logger> stderr_logger_st(const std::string& logger_name);
+
+
+//
+// Create and register a syslog logger
+//
+#ifdef __linux__
+std::shared_ptr<logger> syslog_logger(const std::string& logger_name, const std::string& ident = "", int syslog_option = 0);
+#endif
+
+
+// Create and register a logger with multiple sinks
+std::shared_ptr<logger> create(const std::string& logger_name, sinks_init_list sinks);
+template<class It>
+std::shared_ptr<logger> create(const std::string& logger_name, const It& sinks_begin, const It& sinks_end);
+
+
+// Create and register a logger with templated sink type
+// Example: spdlog::create<daily_file_sink_st>("mylog", "dailylog_filename", "txt");
+template <typename Sink, typename... Args>
+std::shared_ptr<spdlog::logger> create(const std::string& logger_name, const Args&...);
+
+
+// Register the given logger with the given name
+void register_logger(std::shared_ptr<logger> logger);
+
+// Drop the reference to the given logger
+void drop(const std::string &name);
+
+// Drop all references
+void drop_all();
+
+
+///////////////////////////////////////////////////////////////////////////////
+//
+// Macros to be display source file & line
+// Trace & Debug can be switched on/off at compile time for zero cost debug statements.
+// Uncomment SPDLOG_DEBUG_ON/SPDLOG_TRACE_ON in teakme.h to enable.
+//
+// Example:
+// spdlog::set_level(spdlog::level::debug);
+// SPDLOG_DEBUG(my_logger, "Some debug message {} {}", 1, 3.2);
+///////////////////////////////////////////////////////////////////////////////
+
+#ifdef SPDLOG_TRACE_ON
+#define SPDLOG_TRACE(logger, ...) logger->trace(__VA_ARGS__) << " (" << __FILE__ << " #" << __LINE__ <<")";
+#else
+#define SPDLOG_TRACE(logger, ...)
+#endif
+
+#ifdef SPDLOG_DEBUG_ON
+#define SPDLOG_DEBUG(logger, ...) logger->debug(__VA_ARGS__) << " (" << __FILE__ << " #" << __LINE__ <<")";
+#else
+#define SPDLOG_DEBUG(logger, ...)
+#endif
+
+
+}
+
+
+#include "details/spdlog_impl.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/include/spdlog/tweakme.h
----------------------------------------------------------------------
diff --git a/include/spdlog/tweakme.h b/include/spdlog/tweakme.h
new file mode 100644
index 0000000..b651658
--- /dev/null
+++ b/include/spdlog/tweakme.h
@@ -0,0 +1,74 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library. */
+/* Copyright (c) 2014 Gabi Melman. */
+/* */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the */
+/* "Software"), to deal in the Software without restriction, including */
+/* without limitation the rights to use, copy, modify, merge, publish, */
+/* distribute, sublicense, and/or sell copies of the Software, and to */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
+/*************************************************************************/
+
+
+#pragma once
+
+///////////////////////////////////////////////////////////////////////////////
+// Edit this file to squeeze every last drop of performance out of spdlog.
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Under Linux, the much faster CLOCK_REALTIME_COARSE clock can be used.
+// This clock is less accurate - can be off by dozens of millis - depending on the kernel HZ.
+// Uncomment to use it instead of the regular (but slower) clock.
+// #define SPDLOG_CLOCK_COARSE
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment if date/time logging is not needed.
+// This will prevent spdlog from quering the clock on each log call.
+// #define SPDLOG_NO_DATETIME
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment if thread id logging is not needed (i.e. no %t in the log pattern).
+// This will prevent spdlog from quering the thread id on each log call.
+// #define SPDLOG_NO_THREAD_ID
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment if logger name logging is not needed.
+// This will prevent spdlog from copying the logger name on each log call.
+// #define SPDLOG_NO_NAME
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment to enable the SPDLOG_DEBUG/SPDLOG_TRACE macros.
+// #define SPDLOG_DEBUG_ON
+// #define SPDLOG_TRACE_ON
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment to avoid locking in the registry operations (spdlog::get(), spdlog::drop() spdlog::register()).
+// Use only if your code never modifes concurrently the registry.
+// Note that upon creating a logger the registry is modified by spdlog..
+// #define SPDLOG_NO_REGISTRY_MUTEX
+///////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
new file mode 100644
index 0000000..571b73d
--- /dev/null
+++ b/libminifi/CMakeLists.txt
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+#
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless qrequired by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required (VERSION 2.6)
+
+set(PROJECT "apache-nifi-minifi-cpp")
+set(VERSION "0.1.0")
+
+#### Establish Project Configuration ####
+# Enable usage of the VERSION specifier
+# https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048
+cmake_policy(SET CMP0048 NEW)
+
+project(${PROJECT}
+ VERSION ${VERSION})
+
+set(CMAKE_CXX_STANDARD 11)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+include_directories(../include)
+include_directories(include)
+
+file(GLOB SOURCES "src/*.cpp")
+
+add_library(spdlog INTERFACE)
+add_library(minifi STATIC ${SOURCES})
+
+# Include libxml2
+find_package (LibXml2)
+if (LIBXML2_FOUND)
+ include_directories(${LIBXML2_INCLUDE_DIR})
+ target_link_libraries (minifi ${LIBXML2_LIBRARIES})
+else ()
+ # Build from our local version
+endif (LIBXML2_FOUND)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h
new file mode 100644
index 0000000..d325fa0
--- /dev/null
+++ b/libminifi/include/Configure.h
@@ -0,0 +1,115 @@
+/**
+ * @file Configure.h
+ * Configure class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONFIGURE_H__
+#define __CONFIGURE_H__
+
+#include <stdio.h>
+#include <string>
+#include <map>
+#include <stdlib.h>
+#include <errno.h>
+#include <iostream>
+#include <fstream>
+#include "Logger.h"
+
+class Configure {
+public:
+ //! Get the singleton logger instance
+ static Configure * getConfigure()
+ {
+ if (!_configure)
+ {
+ _configure = new Configure();
+ }
+ return _configure;
+ }
+ //! nifi.flow.configuration.file
+ static const char *nifi_flow_configuration_file;
+ static const char *nifi_administrative_yield_duration;
+ static const char *nifi_bored_yield_duration;
+ static const char *nifi_server_name;
+ static const char *nifi_server_port;
+ static const char *nifi_server_report_interval;
+
+ //! Clear the load config
+ void clear()
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ _properties.clear();
+ }
+ //! Set the config value
+ void set(std::string key, std::string value)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ _properties[key] = value;
+ }
+ //! Check whether the config value existed
+ bool has(std::string key)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return (_properties.find(key) != _properties.end());
+ }
+ //! Get the config value
+ bool get(std::string key, std::string &value);
+ // Trim String utils
+ std::string trim(const std::string& s);
+ std::string trimLeft(const std::string& s);
+ std::string trimRight(const std::string& s);
+ //! Parse one line in configure file like key=value
+ void parseConfigureFileLine(char *buf);
+ //! Load Configure File
+ void loadConfigureFile(const char *fileName);
+ //! Set the determined MINIFI_HOME
+ void setHome(std::string minifiHome)
+ {
+ _minifiHome = minifiHome;
+ }
+
+ //! Get the determined MINIFI_HOME
+ std::string getHome()
+ {
+ return _minifiHome;
+ }
+ //! Parse Command Line
+ void parseCommandLine(int argc, char **argv);
+
+private:
+ //! Mutex for protection
+ std::mutex _mtx;
+ //! Logger
+ Logger *_logger;
+ //! Home location for this executable
+ std::string _minifiHome;
+
+ Configure()
+ {
+ _logger = Logger::getLogger();
+ }
+ virtual ~Configure()
+ {
+
+ }
+ static Configure *_configure;
+
+protected:
+ std::map<std::string,std::string> _properties;
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
new file mode 100644
index 0000000..dc6b94b
--- /dev/null
+++ b/libminifi/include/Connection.h
@@ -0,0 +1,201 @@
+/**
+ * @file Connection.h
+ * Connection class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONNECTION_H__
+#define __CONNECTION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+
+#include "FlowFileRecord.h"
+#include "Relationship.h"
+#include "Logger.h"
+
+//! Forwarder declaration
+class Processor;
+
+//! Connection Class
+class Connection
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL);
+ //! Destructor
+ virtual ~Connection() {}
+ //! Set Connection Name
+ void setName(std::string name) {
+ _name = name;
+ }
+ //! Get Process Name
+ std::string getName(void) {
+ return (_name);
+ }
+ //! Set UUID
+ void setUUID(uuid_t uuid) {
+ uuid_copy(_uuid, uuid);
+ }
+ //! Set Source Processor UUID
+ void setSourceProcessorUUID(uuid_t uuid) {
+ uuid_copy(_srcUUID, uuid);
+ }
+ //! Set Destination Processor UUID
+ void setDestinationProcessorUUID(uuid_t uuid) {
+ uuid_copy(_destUUID, uuid);
+ }
+ //! Get Source Processor UUID
+ void getSourceProcessorUUID(uuid_t uuid) {
+ uuid_copy(uuid, _srcUUID);
+ }
+ //! Get Destination Processor UUID
+ void getDestinationProcessorUUID(uuid_t uuid) {
+ uuid_copy(uuid, _destUUID);
+ }
+ //! Get UUID
+ bool getUUID(uuid_t uuid) {
+ if (uuid)
+ {
+ uuid_copy(uuid, _uuid);
+ return true;
+ }
+ else
+ return false;
+ }
+ //! Set Connection Source Processor
+ void setSourceProcessor(Processor *source) {
+ _srcProcessor = source;
+ }
+ // ! Get Connection Source Processor
+ Processor *getSourceProcessor() {
+ return _srcProcessor;
+ }
+ //! Set Connection Destination Processor
+ void setDestinationProcessor(Processor *dest) {
+ _destProcessor = dest;
+ }
+ // ! Get Connection Destination Processor
+ Processor *getDestinationProcessor() {
+ return _destProcessor;
+ }
+ //! Set Connection relationship
+ void setRelationship(Relationship relationship) {
+ _relationship = relationship;
+ }
+ // ! Get Connection relationship
+ Relationship getRelationship() {
+ return _relationship;
+ }
+ //! Set Max Queue Size
+ void setMaxQueueSize(uint64_t size)
+ {
+ _maxQueueSize = size;
+ }
+ //! Get Max Queue Size
+ uint64_t getMaxQueueSize()
+ {
+ return _maxQueueSize;
+ }
+ //! Set Max Queue Data Size
+ void setMaxQueueDataSize(uint64_t size)
+ {
+ _maxQueueDataSize = size;
+ }
+ //! Get Max Queue Data Size
+ uint64_t getMaxQueueDataSize()
+ {
+ return _maxQueueDataSize;
+ }
+ //! Set Flow expiration duration in millisecond
+ void setFlowExpirationDuration(uint64_t duration)
+ {
+ _expiredDuration = duration;
+ }
+ //! Get Flow expiration duration in millisecond
+ uint64_t getFlowExpirationDuration()
+ {
+ return _expiredDuration;
+ }
+ //! Check whether the queue is empty
+ bool isEmpty();
+ //! Check whether the queue is full to apply back pressure
+ bool isFull();
+ //! Get queue size
+ uint64_t getQueueSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _queue.size();
+ }
+ //! Get queue data size
+ uint64_t getQueueDataSize()
+ {
+ return _maxQueueDataSize;
+ }
+ //! Put the flow file into queue
+ void put(FlowFileRecord *flow);
+ //! Poll the flow file from queue, the expired flow file record also being returned
+ FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords);
+ //! Drain the flow records
+ void drain();
+
+protected:
+ //! A global unique identifier
+ uuid_t _uuid;
+ //! Source Processor UUID
+ uuid_t _srcUUID;
+ //! Destination Processor UUID
+ uuid_t _destUUID;
+ //! Connection Name
+ std::string _name;
+ //! Relationship for this connection
+ Relationship _relationship;
+ //! Source Processor (ProcessNode/Port)
+ Processor *_srcProcessor;
+ //! Destination Processor (ProcessNode/Port)
+ Processor *_destProcessor;
+ //! Max queue size to apply back pressure
+ std::atomic<uint64_t> _maxQueueSize;
+ //! Max queue data size to apply back pressure
+ std::atomic<uint64_t> _maxQueueDataSize;
+ //! Flow File Expiration Duration in= MilliSeconds
+ std::atomic<uint64_t> _expiredDuration;
+
+
+private:
+ //! Mutex for protection
+ std::mutex _mtx;
+ //! Queued data size
+ std::atomic<uint64_t> _queuedDataSize;
+ //! Queue for the Flow File
+ std::queue<FlowFileRecord *> _queue;
+ //! Logger
+ Logger *_logger;
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ Connection(const Connection &parent);
+ Connection &operator=(const Connection &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Exception.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
new file mode 100644
index 0000000..d321454
--- /dev/null
+++ b/libminifi/include/Exception.h
@@ -0,0 +1,95 @@
+/**
+ * @file Exception.h
+ * Exception class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXCEPTION_H__
+#define __EXCEPTION_H__
+
+#include <sstream>
+#include <exception>
+#include <stdexcept>
+#include <errno.h>
+#include <string.h>
+
+//! ExceptionType
+enum ExceptionType
+{
+ FILE_OPERATION_EXCEPTION = 0,
+ FLOW_EXCEPTION,
+ PROCESSOR_EXCEPTION,
+ PROCESS_SESSION_EXCEPTION,
+ PROCESS_SCHEDULE_EXCEPTION,
+ SITE2SITE_EXCEPTION,
+ GENERAL_EXCEPTION,
+ MAX_EXCEPTION
+};
+
+//! Exception String
+static const char *ExceptionStr[MAX_EXCEPTION] =
+{
+ "File Operation",
+ "Flow File Operation",
+ "Processor Operation",
+ "Process Session Operation",
+ "Process Schedule Operation",
+ "Site2Site Protocol",
+ "General Operation"
+};
+
+//! Exception Type to String
+inline const char *ExceptionTypeToString(ExceptionType type)
+{
+ if (type < MAX_EXCEPTION)
+ return ExceptionStr[type];
+ else
+ return NULL;
+}
+
+//! Exception Class
+class Exception : public std::exception
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new flow record
+ */
+ Exception(ExceptionType type, const char *errorMsg) : _type(type), _errorMsg(errorMsg) {
+ }
+ //! Destructor
+ virtual ~Exception() throw () {}
+ virtual const char * what() const throw () {
+
+ _whatStr = ExceptionTypeToString(_type);
+
+ _whatStr += ":" + _errorMsg;
+ return _whatStr.c_str();
+ }
+
+protected:
+
+private:
+ //! Exception type
+ ExceptionType _type;
+ //! Exception detailed information
+ std::string _errorMsg;
+ //! Hold the what result
+ mutable std::string _whatStr;
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
new file mode 100644
index 0000000..23f2d49
--- /dev/null
+++ b/libminifi/include/FlowControlProtocol.h
@@ -0,0 +1,339 @@
+/**
+ * @file FlowControlProtocol.h
+ * FlowControlProtocol class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FLOW_CONTROL_PROTOCOL_H__
+#define __FLOW_CONTROL_PROTOCOL_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+
+//! Forwarder declaration
+class FlowController;
+
+#define DEFAULT_NIFI_SERVER_PORT 9000
+#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
+#define MAX_READ_TIMEOUT 30000 // 30 seconds
+
+//! FlowControl Protocol Msg Type
+typedef enum {
+ REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
+ REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
+ REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
+ REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
+ MAX_FLOW_CONTROL_MSG_TYPE
+} FlowControlMsgType;
+
+//! FlowControl Protocol Msg Type String
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
+{
+ "REGISTER_REQ",
+ "REGISTER_RESP",
+ "REPORT_REQ",
+ "REPORT_RESP"
+};
+
+//! Flow Control Msg Type to String
+inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
+{
+ if (type < MAX_FLOW_CONTROL_MSG_TYPE)
+ return FlowControlMsgTypeStr[type];
+ else
+ return NULL;
+}
+
+//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
+typedef enum {
+ //Fix length 8 bytes: client to server in register request, required field
+ FLOW_SERIAL_NUMBER,
+ // Flow XML name TLV: client to server in register request and report request, required field
+ FLOW_XML_NAME,
+ // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
+ FLOW_XML_CONTENT,
+ // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
+ REPORT_INTERVAL,
+ // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROCESSOR_NAME,
+ // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROPERTY_NAME,
+ // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROPERTY_VALUE,
+ // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
+ REPORT_BLOB,
+ MAX_FLOW_MSG_ID
+} FlowControlMsgID;
+
+//! FlowControl Protocol Msg ID String
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
+{
+ "FLOW_SERIAL_NUMBER",
+ "FLOW_XML_NAME",
+ "FLOW_XML_CONTENT",
+ "REPORT_INTERVAL",
+ "PROCESSOR_NAME"
+ "PROPERTY_NAME",
+ "PROPERTY_VALUE",
+ "REPORT_BLOB"
+};
+
+#define TYPE_HDR_LEN 4 // Fix Hdr Type
+#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
+
+//! FlowControl Protocol Msg Len
+inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
+{
+ if (id == FLOW_SERIAL_NUMBER)
+ return (TYPE_HDR_LEN + 8);
+ else if (id == REPORT_INTERVAL)
+ return (TYPE_HDR_LEN + 4);
+ else if (id < MAX_FLOW_MSG_ID)
+ return (TLV_HDR_LEN + payLoadLen);
+ else
+ return -1;
+}
+
+//! Flow Control Msg Id to String
+inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
+{
+ if (id < MAX_FLOW_MSG_ID)
+ return FlowControlMsgIDStr[id];
+ else
+ return NULL;
+}
+
+//! Flow Control Respond status code
+typedef enum {
+ RESP_SUCCESS,
+ RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
+ RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
+ RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
+ RESP_FAILURE,
+ MAX_RESP_CODE
+} FlowControlRespCode;
+
+//! FlowControl Resp Code str
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
+{
+ "RESP_SUCCESS",
+ "RESP_TRIGGER_REGISTER",
+ "RESP_START_FLOW_CONTROLLER",
+ "RESP_STOP_FLOW_CONTROLLER",
+ "RESP_FAILURE"
+};
+
+//! Flow Control Resp Code to String
+inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
+{
+ if (code < MAX_RESP_CODE)
+ return FlowControlRespCodeStr[code];
+ else
+ return NULL;
+}
+
+//! Common FlowControlProtocol Header
+typedef struct {
+ uint32_t msgType; //! Msg Type
+ uint32_t seqNumber; //! Seq Number to match Req with Resp
+ uint32_t status; //! Resp Code, see FlowControlRespCode
+ uint32_t payloadLen; //! Msg Payload length
+} FlowControlProtocolHeader;
+
+//! FlowControlProtocol Class
+class FlowControlProtocol
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new control protocol
+ */
+ FlowControlProtocol(FlowController *controller) {
+ _controller = controller;
+ _logger = Logger::getLogger();
+ _configure = Configure::getConfigure();
+ _socket = 0;
+ _serverName = "localhost";
+ _serverPort = DEFAULT_NIFI_SERVER_PORT;
+ _registered = false;
+ _seqNumber = 0;
+ _reportBlob = NULL;
+ _reportBlobLen = 0;
+ _reportInterval = DEFAULT_REPORT_INTERVAL;
+ _running = false;
+
+ std::string value;
+
+ if (_configure->get(Configure::nifi_server_name, value))
+ {
+ _serverName = value;
+ _logger->log_info("NiFi Server Name %s", _serverName.c_str());
+ }
+ if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort))
+ {
+ _logger->log_info("NiFi Server Port: [%d]", _serverPort);
+ }
+ if (_configure->get(Configure::nifi_server_report_interval, value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _reportInterval, unit) &&
+ Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval))
+ {
+ _logger->log_info("NiFi server report interval: [%d] ms", _reportInterval);
+ }
+ }
+ }
+ //! Destructor
+ virtual ~FlowControlProtocol()
+ {
+ stop();
+ if (_socket)
+ close(_socket);
+ if (_reportBlob)
+ delete [] _reportBlob;
+ if (this->_thread)
+ delete this->_thread;
+ }
+
+public:
+
+ //! SendRegisterRequest and Process Register Respond, return 0 for success
+ int sendRegisterReq();
+ //! SendReportReq and Process Report Respond, return 0 for success
+ int sendReportReq();
+ //! Start the flow control protocol
+ void start();
+ //! Stop the flow control protocol
+ void stop();
+ //! Set Report BLOB for periodically report
+ void setReportBlob(char *blob, int len)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ if (_reportBlob && _reportBlobLen >= len)
+ {
+ memcpy(_reportBlob, blob, len);
+ _reportBlobLen = len;
+ }
+ else
+ {
+ if (_reportBlob)
+ delete[] _reportBlob;
+ _reportBlob = new char[len];
+ _reportBlobLen = len;
+ }
+ }
+ //! Run function for the thread
+ static void run(FlowControlProtocol *protocol);
+ //! set 8 bytes SerialNumber
+ void setSerialNumber(uint8_t *number)
+ {
+ memcpy(_serialNumber, number, 8);
+ }
+
+protected:
+
+private:
+ //! Connect to the socket, return sock descriptor if success, 0 for failure
+ int connectServer(const char *host, uint16_t port);
+ //! Send Data via the socket, return -1 for failure
+ int sendData(uint8_t *buf, int buflen);
+ //! Read length into buf, return -1 for failure and 0 for EOF
+ int readData(uint8_t *buf, int buflen);
+ //! Select on the socket
+ int selectClient(int msec);
+ //! Read the header
+ int readHdr(FlowControlProtocolHeader *hdr);
+ //! encode uint32_t
+ uint8_t *encode(uint8_t *buf, uint32_t value)
+ {
+ *buf++ = (value & 0xFF000000) >> 24;
+ *buf++ = (value & 0x00FF0000) >> 16;
+ *buf++ = (value & 0x0000FF00) >> 8;
+ *buf++ = (value & 0x000000FF);
+ return buf;
+ }
+ //! encode uint32_t
+ uint8_t *decode(uint8_t *buf, uint32_t &value)
+ {
+ value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
+ return (buf + 4);
+ }
+ //! encode byte array
+ uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
+ {
+ memcpy(buf, bufArray, size);
+ buf += size;
+ return buf;
+ }
+ //! encode std::string
+ uint8_t *encode(uint8_t *buf, std::string value)
+ {
+ // add the \0 for size
+ buf = encode(buf, value.size()+1);
+ buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
+ return buf;
+ }
+ //! Mutex for protection
+ std::mutex _mtx;
+ //! Logger
+ Logger *_logger;
+ //! Configure
+ Configure *_configure;
+ //! NiFi server Name
+ std::string _serverName;
+ //! NiFi server port
+ int64_t _serverPort;
+ //! Serial Number
+ uint8_t _serialNumber[8];
+ //! socket to server
+ int _socket;
+ //! report interal in msec
+ int64_t _reportInterval;
+ //! whether it was registered to the NiFi server
+ bool _registered;
+ //! seq number
+ uint32_t _seqNumber;
+ //! FlowController
+ FlowController *_controller;
+ //! report Blob
+ char *_reportBlob;
+ //! report Blob len;
+ int _reportBlobLen;
+ //! thread
+ std::thread *_thread;
+ //! whether it is running
+ bool _running;
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ FlowControlProtocol(const FlowControlProtocol &parent);
+ FlowControlProtocol &operator=(const FlowControlProtocol &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
new file mode 100644
index 0000000..0d758df
--- /dev/null
+++ b/libminifi/include/FlowController.h
@@ -0,0 +1,248 @@
+/**
+ * @file FlowController.h
+ * FlowController class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FLOW_CONTROLLER_H__
+#define __FLOW_CONTROLLER_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+#include <libxml/parser.h>
+#include <libxml/tree.h>
+#include <yaml-cpp/yaml.h>
+
+#include "Configure.h"
+#include "Property.h"
+#include "Relationship.h"
+#include "FlowFileRecord.h"
+#include "Connection.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+#include "ProcessGroup.h"
+#include "GenerateFlowFile.h"
+#include "LogAttribute.h"
+#include "RealTimeDataCollector.h"
+#include "TimerDrivenSchedulingAgent.h"
+#include "FlowControlProtocol.h"
+#include "RemoteProcessorGroupPort.h"
+#include "GetFile.h"
+#include "TailFile.h"
+#include "ListenSyslog.h"
+#include "ExecuteProcess.h"
+
+//! Default NiFi Root Group Name
+#define DEFAULT_ROOT_GROUP_NAME ""
+#define DEFAULT_FLOW_XML_FILE_NAME "conf/flow.xml"
+#define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml"
+#define CONFIG_YAML_PROCESSORS_KEY "Processors"
+
+enum class ConfigFormat { XML, YAML };
+
+struct ProcessorConfig {
+ std::string name;
+ std::string javaClass;
+ std::string maxConcurrentTasks;
+ std::string schedulingStrategy;
+ std::string schedulingPeriod;
+ std::string penalizationPeriod;
+ std::string yieldPeriod;
+ std::string runDurationNanos;
+ std::vector<std::string> autoTerminatedRelationships;
+ std::vector<Property> properties;
+};
+
+//! FlowController Class
+class FlowController
+{
+public:
+ static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
+ static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
+ //! Constructor
+ /*!
+ * Create a new Flow Controller
+ */
+ FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME);
+ //! Destructor
+ virtual ~FlowController();
+ //! Set FlowController Name
+ void setName(std::string name) {
+ _name = name;
+ }
+ //! Get Flow Controller Name
+ std::string getName(void) {
+ return (_name);
+ }
+ //! Set UUID
+ void setUUID(uuid_t uuid) {
+ uuid_copy(_uuid, uuid);
+ }
+ //! Get UUID
+ bool getUUID(uuid_t uuid) {
+ if (uuid)
+ {
+ uuid_copy(uuid, _uuid);
+ return true;
+ }
+ else
+ return false;
+ }
+ //! Set MAX TimerDrivenThreads
+ void setMaxTimerDrivenThreads(int number)
+ {
+ _maxTimerDrivenThreads = number;
+ }
+ //! Get MAX TimerDrivenThreads
+ int getMaxTimerDrivenThreads()
+ {
+ return _maxTimerDrivenThreads;
+ }
+ //! Set MAX EventDrivenThreads
+ void setMaxEventDrivenThreads(int number)
+ {
+ _maxEventDrivenThreads = number;
+ }
+ //! Get MAX EventDrivenThreads
+ int getMaxEventDrivenThreads()
+ {
+ return _maxEventDrivenThreads;
+ }
+ //! Create FlowFile Repository
+ bool createFlowFileRepository();
+ //! Create Content Repository
+ bool createContentRepository();
+
+ //! Life Cycle related function
+ //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows
+ void load(ConfigFormat format);
+ //! Whether the Flow Controller is start running
+ bool isRunning();
+ //! Whether the Flow Controller has already been initialized (loaded flow XML)
+ bool isInitialized();
+ //! Start to run the Flow Controller which internally start the root process group and all its children
+ bool start();
+ //! Stop to run the Flow Controller which internally stop the root process group and all its children
+ void stop(bool force);
+ //! Unload the current flow xml, clean the root process group and all its children
+ void unload();
+ //! Load new xml
+ void reload(std::string xmlFile);
+ //! update property value
+ void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
+ {
+ if (_root)
+ _root->updatePropertyValue(processorName, propertyName, propertyValue);
+ }
+
+ //! Create Processor (Node/Input/Output Port) based on the name
+ Processor *createProcessor(std::string name, uuid_t uuid);
+ //! Create Root Processor Group
+ ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid);
+ //! Create Remote Processor Group
+ ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid);
+ //! Create Connection
+ Connection *createConnection(std::string name, uuid_t uuid);
+ //! set 8 bytes SerialNumber
+ void setSerialNumber(uint8_t *number)
+ {
+ _protocol->setSerialNumber(number);
+ }
+
+protected:
+
+ //! A global unique identifier
+ uuid_t _uuid;
+ //! FlowController Name
+ std::string _name;
+ //! Configuration File Name
+ std::string _configurationFileName;
+ //! NiFi property File Name
+ std::string _propertiesFileName;
+ //! Root Process Group
+ ProcessGroup *_root;
+ //! MAX Timer Driven Threads
+ int _maxTimerDrivenThreads;
+ //! MAX Event Driven Threads
+ int _maxEventDrivenThreads;
+ //! Config
+ //! FlowFile Repo
+ //! Provenance Repo
+ //! Flow Engines
+ //! Flow Scheduler
+ TimerDrivenSchedulingAgent _timerScheduler;
+ //! Controller Service
+ //! Config
+ //! Site to Site Server Listener
+ //! Heart Beat
+ //! FlowControl Protocol
+ FlowControlProtocol *_protocol;
+
+private:
+
+ //! Mutex for protection
+ std::mutex _mtx;
+ //! Logger
+ Logger *_logger;
+ //! Configure
+ Configure *_configure;
+ //! Whether it is running
+ std::atomic<bool> _running;
+ //! Whether it has already been initialized (load the flow XML already)
+ std::atomic<bool> _initialized;
+ //! Process Processor Node XML
+ void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent);
+ //! Process Port XML
+ void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction);
+ //! Process Root Processor Group XML
+ void parseRootProcessGroup(xmlDoc *doc, xmlNode *node);
+ //! Process Property XML
+ void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor);
+ //! Process connection XML
+ void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent);
+ //! Process Remote Process Group
+ void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent);
+
+ //! Process Processor Node YAML
+ void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent);
+ //! Process Port YAML
+ void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction);
+ //! Process Root Processor Group YAML
+ void parseRootProcessGroupYaml(YAML::Node rootNode);
+ //! Process Property YAML
+ void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, Processor *processor);
+ //! Process connection YAML
+ void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent);
+ //! Process Remote Process Group YAML
+ void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent);
+ //! Parse Properties Node YAML for a processor
+ void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor);
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ FlowController(const FlowController &parent);
+ FlowController &operator=(const FlowController &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
new file mode 100644
index 0000000..8b7362f
--- /dev/null
+++ b/libminifi/include/FlowFileRecord.h
@@ -0,0 +1,220 @@
+/**
+ * @file FlowFileRecord.h
+ * Flow file record class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FLOW_FILE_RECORD_H__
+#define __FLOW_FILE_RECORD_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <iostream>
+#include <sstream>
+#include <fstream>
+#include <set>
+
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "ResourceClaim.h"
+
+class ProcessSession;
+class Connection;
+
+#define DEFAULT_FLOWFILE_PATH "."
+
+//! FlowFile Attribute
+enum FlowAttribute
+{
+ //! The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
+ PATH = 0,
+ //! The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
+ ABSOLUTE_PATH,
+ //! The filename of the FlowFile. The filename should not contain any directory structure.
+ FILENAME,
+ //! A unique UUID assigned to this FlowFile.
+ UUID,
+ //! A numeric value indicating the FlowFile priority
+ priority,
+ //! The MIME Type of this FlowFile
+ MIME_TYPE,
+ //! Specifies the reason that a FlowFile is being discarded
+ DISCARD_REASON,
+ //! Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
+ ALTERNATE_IDENTIFIER,
+ MAX_FLOW_ATTRIBUTES
+};
+
+//! FlowFile Attribute Key
+static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] =
+{
+ "path",
+ "absolute.path",
+ "filename",
+ "uuid",
+ "priority",
+ "mime.type",
+ "discard.reason",
+ "alternate.identifier"
+};
+
+//! FlowFile Attribute Enum to Key
+inline const char *FlowAttributeKey(FlowAttribute attribute)
+{
+ if (attribute < MAX_FLOW_ATTRIBUTES)
+ return FlowAttributeKeyArray[attribute];
+ else
+ return NULL;
+}
+
+//! FlowFile IO Callback functions for input and output
+//! throw exception for error
+class InputStreamCallback
+{
+public:
+ virtual void process(std::ifstream *stream) = 0;
+};
+class OutputStreamCallback
+{
+public:
+ virtual void process(std::ofstream *stream) = 0;
+};
+
+
+//! FlowFile Record Class
+class FlowFileRecord
+{
+ friend class ProcessSession;
+public:
+ //! Constructor
+ /*!
+ * Create a new flow record
+ */
+ FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL);
+ //! Destructor
+ virtual ~FlowFileRecord();
+ //! addAttribute key is enum
+ bool addAttribute(FlowAttribute key, std::string value);
+ //! addAttribute key is string
+ bool addAttribute(std::string key, std::string value);
+ //! removeAttribute key is enum
+ bool removeAttribute(FlowAttribute key);
+ //! removeAttribute key is string
+ bool removeAttribute(std::string key);
+ //! updateAttribute key is enum
+ bool updateAttribute(FlowAttribute key, std::string value);
+ //! updateAttribute key is string
+ bool updateAttribute(std::string key, std::string value);
+ //! getAttribute key is enum
+ bool getAttribute(FlowAttribute key, std::string &value);
+ //! getAttribute key is string
+ bool getAttribute(std::string key, std::string &value);
+ //! setAttribute, if attribute already there, update it, else, add it
+ void setAttribute(std::string key, std::string value) {
+ _attributes[key] = value;
+ }
+ //! Get the UUID as string
+ std::string getUUIDStr() {
+ return _uuidStr;
+ }
+ //! Get Attributes
+ std::map<std::string, std::string> getAttributes() {
+ return _attributes;
+ }
+ //! Check whether it is still being penalized
+ bool isPenalized() {
+ return (_penaltyExpirationMs > 0 ? _penaltyExpirationMs > getTimeMillis() : false);
+ }
+ //! Get Size
+ uint64_t getSize() {
+ return _size;
+ }
+ // ! Get Offset
+ uint64_t getOffset() {
+ return _offset;
+ }
+ // ! Get Entry Date
+ uint64_t getEntryDate() {
+ return _entryDate;
+ }
+ // ! Get Lineage Start Date
+ uint64_t getlineageStartDate() {
+ return _lineageStartDate;
+ }
+ // ! Set Original connection
+ void setOriginalConnection (Connection *connection) {
+ _orginalConnection = connection;
+ }
+ //! Get Resource Claim
+ ResourceClaim *getResourceClaim() {
+ return _claim;
+ }
+
+protected:
+
+ //! Date at which the flow file entered the flow
+ uint64_t _entryDate;
+ //! Date at which the origin of this flow file entered the flow
+ uint64_t _lineageStartDate;
+ //! Date at which the flow file was queued
+ uint64_t _lastQueueDate;
+ //! Size in bytes of the data corresponding to this flow file
+ uint64_t _size;
+ //! A global unique identifier
+ uuid_t _uuid;
+ //! A local unique identifier
+ uint64_t _id;
+ //! Offset to the content
+ uint64_t _offset;
+ //! Penalty expiration
+ uint64_t _penaltyExpirationMs;
+ //! Attributes key/values pairs for the flow record
+ std::map<std::string, std::string> _attributes;
+ //! Pointer to the associated content resource claim
+ ResourceClaim *_claim;
+ //! UUID string
+ std::string _uuidStr;
+ //! UUID string for all parents
+ std::set<std::string> _lineageIdentifiers;
+ //! duplicate the original flow file
+ void duplicate(FlowFileRecord *original);
+
+private:
+
+ //! Local flow sequence ID
+ static std::atomic<uint64_t> _localFlowSeqNumber;
+ //! Mark for deletion
+ bool _markedDelete;
+ //! Connection queue that this flow file will be transfer or current in
+ Connection *_connection;
+ //! Orginal connection queue that this flow file was dequeued from
+ Connection *_orginalConnection;
+ //! Logger
+ Logger *_logger;
+ //! Snapshot flow record for session rollback
+ bool _snapshot;
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ FlowFileRecord(const FlowFileRecord &parent);
+ FlowFileRecord &operator=(const FlowFileRecord &parent);
+
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/GenerateFlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/GenerateFlowFile.h b/libminifi/include/GenerateFlowFile.h
new file mode 100644
index 0000000..27aa43b
--- /dev/null
+++ b/libminifi/include/GenerateFlowFile.h
@@ -0,0 +1,87 @@
+/**
+ * @file GenerateFlowFile.h
+ * GenerateFlowFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GENERATE_FLOW_FILE_H__
+#define __GENERATE_FLOW_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! GenerateFlowFile Class
+class GenerateFlowFile : public Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ GenerateFlowFile(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _data = NULL;
+ _dataSize = 0;
+ }
+ //! Destructor
+ virtual ~GenerateFlowFile()
+ {
+ if (_data)
+ delete[] _data;
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property FileSize;
+ static Property BatchSize;
+ static Property DataFormat;
+ static Property UniqueFlowFiles;
+ static const char *DATA_FORMAT_BINARY;
+ static const char *DATA_FORMAT_TEXT;
+ //! Supported Relationships
+ static Relationship Success;
+ //! Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback
+ {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data), _dataSize(size) {}
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+public:
+ //! OnTrigger method, implemented by NiFi GenerateFlowFile
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi GenerateFlowFile
+ virtual void initialize(void);
+
+protected:
+
+private:
+ //! Generated data
+ char * _data;
+ //! Size of the generate data
+ uint64_t _dataSize;
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/GetFile.h b/libminifi/include/GetFile.h
new file mode 100644
index 0000000..eb975fd
--- /dev/null
+++ b/libminifi/include/GetFile.h
@@ -0,0 +1,117 @@
+/**
+ * @file GetFile.h
+ * GetFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GET_FILE_H__
+#define __GET_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! GetFile Class
+class GetFile : public Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ GetFile(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _logger = Logger::getLogger();
+ _directory = ".";
+ _recursive = true;
+ _keepSourceFile = false;
+ _minAge = 0;
+ _maxAge = 0;
+ _minSize = 0;
+ _maxSize = 0;
+ _ignoreHiddenFile = true;
+ _pollInterval = 0;
+ _batchSize = 10;
+ _lastDirectoryListingTime = getTimeMillis();
+ _fileFilter = "[^\\.].*";
+ }
+ //! Destructor
+ virtual ~GetFile()
+ {
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property Directory;
+ static Property Recurse;
+ static Property KeepSourceFile;
+ static Property MinAge;
+ static Property MaxAge;
+ static Property MinSize;
+ static Property MaxSize;
+ static Property IgnoreHiddenFile;
+ static Property PollInterval;
+ static Property BatchSize;
+ static Property FileFilter;
+ //! Supported Relationships
+ static Relationship Success;
+
+public:
+ //! OnTrigger method, implemented by NiFi GetFile
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi GetFile
+ virtual void initialize(void);
+ //! perform directory listing
+ void performListing(std::string dir);
+
+protected:
+
+private:
+ //! Logger
+ Logger *_logger;
+ //! Queue for store directory list
+ std::queue<std::string> _dirList;
+ //! Get Listing size
+ uint64_t getListingSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _dirList.size();
+ }
+ //! Whether the directory listing is empty
+ bool isListingEmpty();
+ //! Put full path file name into directory listing
+ void putListing(std::string fileName);
+ //! Poll directory listing for files
+ void pollListing(std::queue<std::string> &list, int maxSize);
+ //! Check whether file can be added to the directory listing
+ bool acceptFile(std::string fileName);
+ //! Mutex for protection of the directory listing
+ std::mutex _mtx;
+ std::string _directory;
+ bool _recursive;
+ bool _keepSourceFile;
+ int64_t _minAge;
+ int64_t _maxAge;
+ int64_t _minSize;
+ int64_t _maxSize;
+ bool _ignoreHiddenFile;
+ int64_t _pollInterval;
+ int64_t _batchSize;
+ uint64_t _lastDirectoryListingTime;
+ std::string _fileFilter;
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ListenSyslog.h b/libminifi/include/ListenSyslog.h
new file mode 100644
index 0000000..81bc92c
--- /dev/null
+++ b/libminifi/include/ListenSyslog.h
@@ -0,0 +1,209 @@
+/**
+ * @file ListenSyslog.h
+ * ListenSyslog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_SYSLOG_H__
+#define __LISTEN_SYSLOG_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <chrono>
+#include <thread>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! SyslogEvent
+typedef struct {
+ uint8_t *payload;
+ uint64_t len;
+} SysLogEvent;
+
+//! ListenSyslog Class
+class ListenSyslog : public Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ ListenSyslog(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _logger = Logger::getLogger();
+ _eventQueueByteSize = 0;
+ _serverSocket = 0;
+ _recvBufSize = 65507;
+ _maxSocketBufSize = 1024*1024;
+ _maxConnections = 2;
+ _maxBatchSize = 1;
+ _messageDelimiter = "\n";
+ _protocol = "UDP";
+ _port = 514;
+ _parseMessages = false;
+ _serverSocket = 0;
+ _maxFds = 0;
+ FD_ZERO(&_readfds);
+ _thread = NULL;
+ _resetServerSocket = false;
+ _serverTheadRunning = false;
+ }
+ //! Destructor
+ virtual ~ListenSyslog()
+ {
+ _serverTheadRunning = false;
+ if (this->_thread)
+ delete this->_thread;
+ // need to reset the socket
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+ {
+ int clientSocket = *it;
+ close(clientSocket);
+ }
+ _clientSockets.clear();
+ if (_serverSocket > 0)
+ {
+ _logger->log_info("ListenSysLog Server socket %d close", _serverSocket);
+ close(_serverSocket);
+ _serverSocket = 0;
+ }
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property RecvBufSize;
+ static Property MaxSocketBufSize;
+ static Property MaxConnections;
+ static Property MaxBatchSize;
+ static Property MessageDelimiter;
+ static Property ParseMessages;
+ static Property Protocol;
+ static Property Port;
+ //! Supported Relationships
+ static Relationship Success;
+ static Relationship Invalid;
+ //! Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback
+ {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data), _dataSize(size) {}
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+public:
+ //! OnTrigger method, implemented by NiFi ListenSyslog
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi ListenSyslog
+ virtual void initialize(void);
+
+protected:
+
+private:
+ //! Logger
+ Logger *_logger;
+ //! Run function for the thread
+ static void run(ListenSyslog *process);
+ //! Run Thread
+ void runThread();
+ //! Queue for store syslog event
+ std::queue<SysLogEvent> _eventQueue;
+ //! Size of Event queue in bytes
+ uint64_t _eventQueueByteSize;
+ //! Get event queue size
+ uint64_t getEventQueueSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _eventQueue.size();
+ }
+ //! Get event queue byte size
+ uint64_t getEventQueueByteSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _eventQueueByteSize;
+ }
+ //! Whether the event queue is empty
+ bool isEventQueueEmpty()
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _eventQueue.empty();
+ }
+ //! Put event into directory listing
+ void putEvent(uint8_t *payload, uint64_t len)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ SysLogEvent event;
+ event.payload = payload;
+ event.len = len;
+ _eventQueue.push(event);
+ _eventQueueByteSize += len;
+ }
+ //! Read \n terminated line from TCP socket
+ int readline( int fd, char *bufptr, size_t len );
+ //! start server socket and handling client socket
+ void startSocketThread();
+ //! Poll event
+ void pollEvent(std::queue<SysLogEvent> &list, int maxSize)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize))
+ {
+ SysLogEvent event = _eventQueue.front();
+ _eventQueue.pop();
+ _eventQueueByteSize -= event.len;
+ list.push(event);
+ }
+ return;
+ }
+ //! Mutex for protection of the directory listing
+ std::mutex _mtx;
+ int64_t _recvBufSize;
+ int64_t _maxSocketBufSize;
+ int64_t _maxConnections;
+ int64_t _maxBatchSize;
+ std::string _messageDelimiter;
+ std::string _protocol;
+ int64_t _port;
+ bool _parseMessages;
+ int _serverSocket;
+ std::vector<int> _clientSockets;
+ int _maxFds;
+ fd_set _readfds;
+ //! thread
+ std::thread *_thread;
+ //! whether to reset the server socket
+ bool _resetServerSocket;
+ bool _serverTheadRunning;
+ //! buffer for read socket
+ uint8_t _buffer[2048];
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/LogAttribute.h b/libminifi/include/LogAttribute.h
new file mode 100644
index 0000000..125ebf3
--- /dev/null
+++ b/libminifi/include/LogAttribute.h
@@ -0,0 +1,128 @@
+/**
+ * @file LogAttribute.h
+ * LogAttribute class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOG_ATTRIBUTE_H__
+#define __LOG_ATTRIBUTE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! LogAttribute Class
+class LogAttribute : public Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ LogAttribute(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _logger = Logger::getLogger();
+ }
+ //! Destructor
+ virtual ~LogAttribute()
+ {
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property LogLevel;
+ static Property AttributesToLog;
+ static Property AttributesToIgnore;
+ static Property LogPayload;
+ static Property LogPrefix;
+ //! Supported Relationships
+ static Relationship Success;
+ enum LogAttrLevel {
+ LogAttrLevelTrace, LogAttrLevelDebug, LogAttrLevelInfo, LogAttrLevelWarn, LogAttrLevelError
+ };
+ //! Convert log level from string to enum
+ bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level)
+ {
+ if (logStr == "trace")
+ {
+ level = LogAttrLevelTrace;
+ return true;
+ }
+ else if (logStr == "debug")
+ {
+ level = LogAttrLevelDebug;
+ return true;
+ }
+ else if (logStr == "info")
+ {
+ level = LogAttrLevelInfo;
+ return true;
+ }
+ else if (logStr == "warn")
+ {
+ level = LogAttrLevelWarn;
+ return true;
+ }
+ else if (logStr == "error")
+ {
+ level = LogAttrLevelError;
+ return true;
+ }
+ else
+ return false;
+ }
+ //! Nest Callback Class for read stream
+ class ReadCallback : public InputStreamCallback
+ {
+ public:
+ ReadCallback(uint64_t size)
+ {
+ _bufferSize = size;
+ _buffer = new char[_bufferSize];
+ }
+ ~ReadCallback()
+ {
+ if (_buffer)
+ delete[] _buffer;
+ }
+ void process(std::ifstream *stream) {
+
+ stream->read(_buffer, _bufferSize);
+ if (!stream)
+ _readSize = stream->gcount();
+ else
+ _readSize = _bufferSize;
+ }
+ char *_buffer;
+ uint64_t _bufferSize;
+ uint64_t _readSize;
+ };
+
+public:
+ //! OnTrigger method, implemented by NiFi LogAttribute
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi LogAttribute
+ virtual void initialize(void);
+
+protected:
+
+private:
+ //! Logger
+ Logger *_logger;
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Logger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Logger.h b/libminifi/include/Logger.h
new file mode 100644
index 0000000..3edad9d
--- /dev/null
+++ b/libminifi/include/Logger.h
@@ -0,0 +1,154 @@
+/**
+ * @file Logger.h
+ * Logger class declaration
+ * This is a C++ wrapper for spdlog, a lightweight C++ logging library
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOGGER_H__
+#define __LOGGER_H__
+
+#include "spdlog/spdlog.h"
+
+using spdlog::stdout_logger_mt;
+using spdlog::rotating_logger_mt;
+using spdlog::logger;
+
+#define LOG_BUFFER_SIZE 1024
+#define FILL_BUFFER char buffer[LOG_BUFFER_SIZE]; \
+ va_list args; \
+ va_start(args, format); \
+ vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \
+ va_end(args);
+
+//! 5M default log file size
+#define DEFAULT_LOG_FILE_SIZE (5*1024*1024)
+//! 3 log files rotation
+#define DEFAULT_LOG_FILE_NUMBER 3
+#define LOG_NAME "minifi log"
+#define LOG_FILE_NAME "minifi-app.log"
+
+typedef enum
+{
+ trace = 0,
+ debug = 1,
+ info = 2,
+ notice = 3,
+ warn = 4,
+ err = 5,
+ critical = 6,
+ alert = 7,
+ emerg = 8,
+ off = 9
+} LOG_LEVEL_E;
+
+//! Logger Class
+class Logger {
+
+public:
+
+ //! Get the singleton logger instance
+ static Logger * getLogger() {
+ if (!_logger)
+ _logger = new Logger();
+ return _logger;
+ }
+ void setLogLevel(LOG_LEVEL_E level) {
+ if (_spdlog == NULL)
+ return;
+ _spdlog->set_level((spdlog::level::level_enum) level);
+ }
+ //! Destructor
+ ~Logger() {}
+ /**
+ * @brief Log error message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+ void log_error(const char *const format, ...) {
+ if(_spdlog == NULL)
+ return;
+ FILL_BUFFER
+ _spdlog->error(buffer);
+ }
+ /**
+ * @brief Log warn message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+ void log_warn(const char *const format, ...) {
+ if(_spdlog == NULL)
+ return;
+ FILL_BUFFER
+ _spdlog->warn(buffer);
+ }
+ /**
+ * @brief Log info message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+ void log_info(const char *const format, ...) {
+ if(_spdlog == NULL)
+ return;
+ FILL_BUFFER
+ _spdlog->info(buffer);
+ }
+ /**
+ * @brief Log debug message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+ void log_debug(const char *const format, ...) {
+ if(_spdlog == NULL)
+ return;
+ FILL_BUFFER
+ _spdlog->debug(buffer);
+ }
+ /**
+ * @brief Log trace message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+ void log_trace(const char *const format, ...) {
+ if(_spdlog == NULL)
+ return;
+ FILL_BUFFER
+ _spdlog->trace(buffer);
+ }
+
+protected:
+
+private:
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ Logger(const Logger &parent);
+ Logger &operator=(const Logger &parent);
+ //! Constructor
+ /*!
+ * Create a logger
+ * */
+ Logger(const std::string logger_name = LOG_NAME, const std::string filename = LOG_FILE_NAME, size_t max_file_size = DEFAULT_LOG_FILE_SIZE, size_t max_files = DEFAULT_LOG_FILE_NUMBER, bool force_flush = true) {
+ _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush);
+ _spdlog->set_level((spdlog::level::level_enum) debug);
+ }
+ //! spdlog
+ std::shared_ptr<logger> _spdlog;
+
+ //! Singleton logger instance
+ static Logger *_logger;
+};
+
+#endif