You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:10 UTC
[7/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with
increased line length to source
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/io/tls/TLSSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index c56f6c8..b8915a6 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -38,27 +38,24 @@ namespace io {
#define TLS_ERROR_KEY_ERROR 4
#define TLS_ERROR_CERT_ERROR 5
-class OpenSSLInitializer
-{
+class OpenSSLInitializer {
public:
static OpenSSLInitializer *getInstance() {
- OpenSSLInitializer* atomic_context = context_instance.load(
- std::memory_order_relaxed);
- std::atomic_thread_fence(std::memory_order_acquire);
- if (atomic_context == nullptr) {
- std::lock_guard<std::mutex> lock(context_mutex);
- atomic_context = context_instance.load(std::memory_order_relaxed);
- if (atomic_context == nullptr) {
- atomic_context = new OpenSSLInitializer();
- std::atomic_thread_fence(std::memory_order_release);
- context_instance.store(atomic_context, std::memory_order_relaxed);
- }
- }
- return atomic_context;
- }
-
- OpenSSLInitializer()
- {
+ OpenSSLInitializer* atomic_context = context_instance.load(std::memory_order_relaxed);
+ std::atomic_thread_fence(std::memory_order_acquire);
+ if (atomic_context == nullptr) {
+ std::lock_guard<std::mutex> lock(context_mutex);
+ atomic_context = context_instance.load(std::memory_order_relaxed);
+ if (atomic_context == nullptr) {
+ atomic_context = new OpenSSLInitializer();
+ std::atomic_thread_fence(std::memory_order_release);
+ context_instance.store(atomic_context, std::memory_order_relaxed);
+ }
+ }
+ return atomic_context;
+ }
+
+ OpenSSLInitializer() {
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
@@ -68,11 +65,11 @@ class OpenSSLInitializer
static std::mutex context_mutex;
};
-class TLSContext: public SocketContext {
+class TLSContext : public SocketContext {
public:
TLSContext(const std::shared_ptr<Configure> &configure);
-
+
virtual ~TLSContext() {
if (0 != ctx)
SSL_CTX_free(ctx);
@@ -93,8 +90,7 @@ class TLSContext: public SocketContext {
static int pemPassWordCb(char *buf, int size, int rwflag, void *configure) {
std::string passphrase;
- if (static_cast<Configure*>(configure)->get(
- Configure::nifi_security_client_pass_phrase, passphrase)) {
+ if (static_cast<Configure*>(configure)->get(Configure::nifi_security_client_pass_phrase, passphrase)) {
std::ifstream file(passphrase.c_str(), std::ifstream::in);
if (!file.good()) {
@@ -103,8 +99,7 @@ class TLSContext: public SocketContext {
}
std::string password;
- password.assign((std::istreambuf_iterator<char>(file)),
- std::istreambuf_iterator<char>());
+ password.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
file.close();
memset(buf, 0x00, size);
memcpy(buf, password.c_str(), password.length() - 1);
@@ -114,7 +109,6 @@ class TLSContext: public SocketContext {
return 0;
}
-
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<Configure> configure_;
SSL_CTX *ctx;
@@ -133,8 +127,7 @@ class TLSSocket : public Socket {
* @param port connecting port
* @param listeners number of listeners in the queue
*/
- explicit TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port,
- const uint16_t listeners);
+ explicit TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
/**
* Constructor that creates a client socket.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/io/validation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h
index a5e1bc5..70de439 100644
--- a/libminifi/include/io/validation.h
+++ b/libminifi/include/io/validation.h
@@ -45,8 +45,7 @@ class size_function_functor_checker {
* Determines if the variable is null or ::size() == 0
*/
template<typename T>
-static auto IsNullOrEmpty(
- T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
+static auto IsNullOrEmpty(T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
return object.size() == 0;
}
@@ -54,8 +53,7 @@ static auto IsNullOrEmpty(
* Determines if the variable is null or ::size() == 0
*/
template<typename T>
-static auto IsNullOrEmpty(
- T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
+static auto IsNullOrEmpty(T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type {
return (nullptr == object || object->size() == 0);
}
@@ -63,8 +61,7 @@ static auto IsNullOrEmpty(
* Determines if the variable is null or ::size() == 0
*/
template<typename T>
-static auto IsNullOrEmpty(
- T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
+static auto IsNullOrEmpty(T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
return (nullptr == object);
}
@@ -72,8 +69,7 @@ static auto IsNullOrEmpty(
* Determines if the variable is null or ::size() == 0
*/
template<typename T>
-static auto IsNullOrEmpty(
- std::shared_ptr<T> object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
+static auto IsNullOrEmpty(std::shared_ptr<T> object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type {
return (nullptr == object || nullptr == object.get());
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/AppendHostInfo.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h
index 610251f..e7c90ac 100644
--- a/libminifi/include/processors/AppendHostInfo.h
+++ b/libminifi/include/processors/AppendHostInfo.h
@@ -43,7 +43,7 @@ class AppendHostInfo : public core::Processor {
*/
AppendHostInfo(std::string name, uuid_t uuid = NULL)
: core::Processor(name, uuid),
- logger_(logging::LoggerFactory<AppendHostInfo>::getLogger()){
+ logger_(logging::LoggerFactory<AppendHostInfo>::getLogger()) {
}
// Destructor
virtual ~AppendHostInfo() {
@@ -60,8 +60,7 @@ class AppendHostInfo : public core::Processor {
public:
// OnTrigger method, implemented by NiFi AppendHostInfo
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi AppendHostInfo
virtual void initialize(void);
@@ -74,7 +73,6 @@ class AppendHostInfo : public core::Processor {
REGISTER_RESOURCE(AppendHostInfo);
-
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h
index c719405..28dcf76 100644
--- a/libminifi/include/processors/ExecuteProcess.h
+++ b/libminifi/include/processors/ExecuteProcess.h
@@ -53,7 +53,7 @@ class ExecuteProcess : public core::Processor {
*/
ExecuteProcess(std::string name, uuid_t uuid = NULL)
: Processor(name, uuid),
- logger_(logging::LoggerFactory<ExecuteProcess>::getLogger()){
+ logger_(logging::LoggerFactory<ExecuteProcess>::getLogger()) {
_redirectErrorStream = false;
_batchDuration = 0;
_workingDir = ".";
@@ -93,8 +93,7 @@ class ExecuteProcess : public core::Processor {
public:
// OnTrigger method, implemented by NiFi ExecuteProcess
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi ExecuteProcess
virtual void initialize(void);
@@ -107,8 +106,7 @@ class ExecuteProcess : public core::Processor {
std::string _command;
std::string _commandArgument;
std::string _workingDir;
- int64_t _batchDuration;
- bool _redirectErrorStream;
+ int64_t _batchDuration;bool _redirectErrorStream;
// Full command
std::string _fullCommand;
// whether the process is running
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/GenerateFlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h
index 2f24e64..abb5740 100644
--- a/libminifi/include/processors/GenerateFlowFile.h
+++ b/libminifi/include/processors/GenerateFlowFile.h
@@ -76,8 +76,7 @@ class GenerateFlowFile : public core::Processor {
public:
// OnTrigger method, implemented by NiFi GenerateFlowFile
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi GenerateFlowFile
virtual void initialize(void);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h
index 8864601..df8f775 100644
--- a/libminifi/include/processors/GetFile.h
+++ b/libminifi/include/processors/GetFile.h
@@ -33,14 +33,11 @@ namespace minifi {
namespace processors {
struct GetFileRequest {
- std::string directory = ".";
- bool recursive = true;
- bool keepSourceFile = false;
+ std::string directory = ".";bool recursive = true;bool keepSourceFile = false;
int64_t minAge = 0;
int64_t maxAge = 0;
int64_t minSize = 0;
- int64_t maxSize = 0;
- bool ignoreHiddenFile = true;
+ int64_t maxSize = 0;bool ignoreHiddenFile = true;
int64_t pollInterval = 0;
int64_t batchSize = 10;
std::string fileFilter = "[^\\.].*";
@@ -54,7 +51,8 @@ class GetFile : public core::Processor {
* Create a new processor
*/
explicit GetFile(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid), logger_(logging::LoggerFactory<GetFile>::getLogger()) {
+ : Processor(name, uuid),
+ logger_(logging::LoggerFactory<GetFile>::getLogger()) {
}
// Destructor
@@ -84,15 +82,13 @@ class GetFile : public core::Processor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
/**
* Execution trigger for the GetFile Processor
* @param context processor context
* @param session processor session reference.
*/
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi GetFile
virtual void initialize(void);
@@ -119,11 +115,9 @@ class GetFile : public core::Processor {
// Put full path file name into directory listing
void putListing(std::string fileName);
// Poll directory listing for files
- void pollListing(std::queue<std::string> &list,
- const GetFileRequest &request);
+ void pollListing(std::queue<std::string> &list, const GetFileRequest &request);
// Check whether file can be added to the directory listing
- bool acceptFile(std::string fullName, std::string name,
- const GetFileRequest &request);
+ bool acceptFile(std::string fullName, std::string name, const GetFileRequest &request);
// Get file request object.
GetFileRequest request_;
// Mutex for protection of the directory listing
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h
index 59bc2bd..ab78fd5 100644
--- a/libminifi/include/processors/InvokeHTTP.h
+++ b/libminifi/include/processors/InvokeHTTP.h
@@ -53,10 +53,8 @@ struct HTTPRequestResponse {
/**
* Receive HTTP Response.
*/
- static size_t recieve_write(char * data, size_t size, size_t nmemb,
- void * p) {
- return static_cast<HTTPRequestResponse*>(p)->write_content(data, size,
- nmemb);
+ static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) {
+ return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, nmemb);
}
/**
@@ -160,8 +158,7 @@ class InvokeHTTP : public core::Processor {
void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
void initialize();
- void onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
protected:
@@ -194,9 +191,7 @@ class InvokeHTTP : public core::Processor {
*/
void set_request_method(CURL *curl, const std::string &);
- struct curl_slist *build_header_list(
- CURL *curl, std::string regex,
- const std::map<std::string, std::string> &);
+ struct curl_slist *build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &);
bool matches(const std::string &value, const std::string &sregex);
@@ -209,10 +204,8 @@ class InvokeHTTP : public core::Processor {
* @param isSuccess success code or not
* @param statuscode http response code.
*/
- void route(std::shared_ptr<FlowFileRecord> &request,
- std::shared_ptr<FlowFileRecord> &response,
- core::ProcessSession *session, core::ProcessContext *context,
- bool isSuccess,
+ void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context,
+ bool isSuccess,
int statusCode);
/**
* Determine if we should emit a new flowfile based on our activity
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h
index 03fe03d..c9e42bc 100644
--- a/libminifi/include/processors/ListenHTTP.h
+++ b/libminifi/include/processors/ListenHTTP.h
@@ -68,17 +68,13 @@ class ListenHTTP : public core::Processor {
void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
void initialize();
- void onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// HTTP request handler
class Handler : public CivetHandler {
public:
- Handler(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory,
- std::string &&authDNPattern,
- std::string &&headersAsAttributesPattern);
- bool handlePost(CivetServer *server, struct mg_connection *conn);
+ Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost(
+ CivetServer *server, struct mg_connection *conn);
private:
// Send HTTP 500 error response to client
@@ -95,8 +91,7 @@ class ListenHTTP : public core::Processor {
// Write callback for transferring data from HTTP request to content repo
class WriteCallback : public OutputStreamCallback {
public:
- WriteCallback(struct mg_connection *conn,
- const struct mg_request_info *reqInfo);
+ WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
void process(std::ofstream *stream);
private:
@@ -115,7 +110,6 @@ class ListenHTTP : public core::Processor {
std::unique_ptr<Handler> _handler;
};
-
REGISTER_RESOURCE(ListenHTTP);
} /* namespace processors */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
index f0380db..ed54b44 100644
--- a/libminifi/include/processors/ListenSyslog.h
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -127,8 +127,7 @@ class ListenSyslog : public core::Processor {
public:
// OnTrigger method, implemented by NiFi ListenSyslog
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi ListenSyslog
virtual void initialize(void);
@@ -193,8 +192,7 @@ class ListenSyslog : public core::Processor {
int64_t _maxBatchSize;
std::string _messageDelimiter;
std::string _protocol;
- int64_t _port;
- bool _parseMessages;
+ int64_t _port;bool _parseMessages;
int _serverSocket;
std::vector<int> _clientSockets;
int _maxFds;
@@ -202,8 +200,7 @@ class ListenSyslog : public core::Processor {
// thread
std::thread *_thread;
// whether to reset the server socket
- bool _resetServerSocket;
- bool _serverTheadRunning;
+ bool _resetServerSocket;bool _serverTheadRunning;
// buffer for read socket
char _buffer[2048];
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/LoadProcessors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h
index 7a16773..3e6cfcf 100644
--- a/libminifi/include/processors/LoadProcessors.h
+++ b/libminifi/include/processors/LoadProcessors.h
@@ -30,5 +30,4 @@
#include "PutFile.h"
#include "TailFile.h"
-
#endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h
index cbb8f1a..88230f7 100644
--- a/libminifi/include/processors/LogAttribute.h
+++ b/libminifi/include/processors/LogAttribute.h
@@ -110,8 +110,7 @@ class LogAttribute : public core::Processor {
public:
// OnTrigger method, implemented by NiFi LogAttribute
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi LogAttribute
virtual void initialize(void);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index 3918d0a..d08ebbc 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -48,7 +48,8 @@ class PutFile : public core::Processor {
* Create a new processor
*/
PutFile(std::string name, uuid_t uuid = NULL)
- : core::Processor(name, uuid), logger_(logging::LoggerFactory<PutFile>::getLogger()) {
+ : core::Processor(name, uuid),
+ logger_(logging::LoggerFactory<PutFile>::getLogger()) {
}
// Destructor
virtual ~PutFile() {
@@ -67,12 +68,10 @@ class PutFile : public core::Processor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- virtual void onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory);
+ virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// OnTrigger method, implemented by NiFi PutFile
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi PutFile
virtual void initialize(void);
@@ -80,13 +79,11 @@ class PutFile : public core::Processor {
public:
ReadCallback(const std::string &tmpFile, const std::string &destFile);
~ReadCallback();
- virtual void process(std::ifstream *stream);
- bool commit();
+ virtual void process(std::ifstream *stream);bool commit();
private:
std::shared_ptr<logging::Logger> logger_;
- std::ofstream _tmpFileOs;
- bool _writeSucceeded = false;
+ std::ofstream _tmpFileOs;bool _writeSucceeded = false;
std::string _tmpFile;
std::string _destFile;
};
@@ -100,9 +97,7 @@ class PutFile : public core::Processor {
// conflict resolution type.
std::string conflict_resolution_;
- bool putFile(core::ProcessSession *session,
- std::shared_ptr<FlowFileRecord> flowFile,
- const std::string &tmpFile, const std::string &destFile);
+ bool putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile);
std::shared_ptr<logging::Logger> logger_;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index 47ec3fb..59cf224 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -41,7 +41,8 @@ class TailFile : public core::Processor {
* Create a new processor
*/
explicit TailFile(std::string name, uuid_t uuid = NULL)
- : core::Processor(name, uuid), logger_(logging::LoggerFactory<TailFile>::getLogger()) {
+ : core::Processor(name, uuid),
+ logger_(logging::LoggerFactory<TailFile>::getLogger()) {
_stateRecovered = false;
}
// Destructor
@@ -58,8 +59,7 @@ class TailFile : public core::Processor {
public:
// OnTrigger method, implemented by NiFi TailFile
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi TailFile
virtual void initialize(void);
// recoverState
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/properties/Properties.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index 8d08e8d..8797c19 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -38,11 +38,11 @@ namespace minifi {
class Properties {
public:
Properties();
-
+
virtual ~Properties() {
}
-
+
// Clear the load config
void clear() {
std::lock_guard<std::mutex> lock(mutex_);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 86ee713..1fa4a72 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -49,8 +49,7 @@ namespace provenance {
#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
// Provenance Event Record
-class ProvenanceEventRecord :
- protected org::apache::nifi::minifi::io::Serializable {
+class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializable {
public:
enum ProvenanceEventType {
@@ -155,14 +154,14 @@ class ProvenanceEventRecord :
*/
REPLAY
};
- static const char *ProvenanceEventTypeStr[REPLAY+1];
- public:
+ static const char *ProvenanceEventTypeStr[REPLAY + 1];
+ public:
// Constructor
/*!
* Create a new provenance event record
*/
- ProvenanceEventRecord(ProvenanceEventType event, std::string componentId,
- std::string componentType): logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) {
+ ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType)
+ : logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) {
_eventType = event;
_componentId = componentId;
_componentType = componentType;
@@ -174,7 +173,8 @@ class ProvenanceEventRecord :
_eventIdStr = eventIdStr;
}
- ProvenanceEventRecord(): logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) {
+ ProvenanceEventRecord()
+ : logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) {
_eventTime = getTimeMillis();
}
@@ -271,8 +271,7 @@ class ProvenanceEventRecord :
}
// Add Parent UUID
void addParentUuid(std::string uuid) {
- if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid)
- != _parentUuids.end())
+ if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end())
return;
else
_parentUuids.push_back(uuid);
@@ -284,9 +283,7 @@ class ProvenanceEventRecord :
}
// Remove Parent UUID
void removeParentUuid(std::string uuid) {
- _parentUuids.erase(
- std::remove(_parentUuids.begin(), _parentUuids.end(), uuid),
- _parentUuids.end());
+ _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end());
}
// Remove Parent Flow File
void removeParentFlowFile(std::shared_ptr<core::FlowFile> flow) {
@@ -299,8 +296,7 @@ class ProvenanceEventRecord :
}
// Add Child UUID
void addChildUuid(std::string uuid) {
- if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid)
- != _childrenUuids.end())
+ if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end())
return;
else
_childrenUuids.push_back(uuid);
@@ -312,9 +308,7 @@ class ProvenanceEventRecord :
}
// Remove Child UUID
void removeChildUuid(std::string uuid) {
- _childrenUuids.erase(
- std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid),
- _childrenUuids.end());
+ _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end());
}
// Remove Child Flow File
void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
@@ -369,8 +363,7 @@ class ProvenanceEventRecord :
return DeSerialize(stream.getBuffer(), stream.getSize());
}
// DeSerialize
- bool DeSerialize(const std::shared_ptr<core::Repository> &repo,
- std::string key);
+ bool DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key);
protected:
@@ -440,8 +433,8 @@ class ProvenanceReporter {
/*!
* Create a new provenance reporter associated with the process session
*/
- ProvenanceReporter(std::shared_ptr<core::Repository> repo,
- std::string componentId, std::string componentType) : logger_(logging::LoggerFactory<ProvenanceReporter>::getLogger()) {
+ ProvenanceReporter(std::shared_ptr<core::Repository> repo, std::string componentId, std::string componentType)
+ : logger_(logging::LoggerFactory<ProvenanceReporter>::getLogger()) {
_componentId = componentId;
_componentType = componentType;
repo_ = repo;
@@ -474,12 +467,8 @@ class ProvenanceReporter {
_events.clear();
}
// allocate
- ProvenanceEventRecord *allocate(
- ProvenanceEventRecord::ProvenanceEventType eventType,
- std::shared_ptr<core::FlowFile> flow) {
- ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType,
- _componentId,
- _componentType);
+ ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) {
+ ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType);
if (event)
event->fromFlowFile(flow);
@@ -490,39 +479,27 @@ class ProvenanceReporter {
// create
void create(std::shared_ptr<core::FlowFile> flow, std::string detail);
// route
- void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation,
- std::string detail, uint64_t processingDuration);
+ void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration);
// modifyAttributes
- void modifyAttributes(std::shared_ptr<core::FlowFile> flow,
- std::string detail);
+ void modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail);
// modifyContent
- void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail,
- uint64_t processingDuration);
+ void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration);
// clone
- void clone(std::shared_ptr<core::FlowFile> parent,
- std::shared_ptr<core::FlowFile> child);
+ void clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child);
// join
- void join(std::vector<std::shared_ptr<core::FlowFile> > parents,
- std::shared_ptr<core::FlowFile> child, std::string detail,
- uint64_t processingDuration);
+ void join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration);
// fork
- void fork(std::vector<std::shared_ptr<core::FlowFile> > child,
- std::shared_ptr<core::FlowFile> parent, std::string detail,
- uint64_t processingDuration);
+ void fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration);
// expire
void expire(std::shared_ptr<core::FlowFile> flow, std::string detail);
// drop
void drop(std::shared_ptr<core::FlowFile> flow, std::string reason);
// send
- void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri,
- std::string detail, uint64_t processingDuration, bool force);
+ void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force);
// fetch
- void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri,
- std::string detail, uint64_t processingDuration);
+ void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration);
// receive
- void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri,
- std::string sourceSystemFlowFileIdentifier, std::string detail,
- uint64_t processingDuration);
+ void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration);
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h
index 8bcc316..dd2c5ec 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -36,20 +36,16 @@ namespace provenance {
#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
-class ProvenanceRepository : public core::Repository,
- public std::enable_shared_from_this<ProvenanceRepository> {
+class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
public:
// Constructor
/*!
* Create a new provenance repository
*/
- ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY,
- int64_t maxPartitionMillis =
- MAX_PROVENANCE_ENTRY_LIFE_TIME,
- int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
- uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
- : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory,
- maxPartitionMillis, maxPartitionBytes, purgePeriod),
+ ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis =
+ MAX_PROVENANCE_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
+ : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) {
db_ = NULL;
@@ -60,53 +56,42 @@ class ProvenanceRepository : public core::Repository,
if (db_)
delete db_;
}
-
+
void start() {
- if (this->purge_period_ <= 0)
- return;
- if (running_)
- return;
- thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
- thread_.detach();
- running_ = true;
- logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
-}
+ if (this->purge_period_ <= 0)
+ return;
+ if (running_)
+ return;
+ thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
+ thread_.detach();
+ running_ = true;
+ logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+ }
// initialize
virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
std::string value;
- if (config->get(Configure::nifi_provenance_repository_directory_default,
- value)) {
+ if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
directory_ = value;
}
- logger_->log_info("NiFi Provenance Repository Directory %s",
- directory_.c_str());
- if (config->get(Configure::nifi_provenance_repository_max_storage_size,
- value)) {
+ logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str());
+ if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
core::Property::StringToInt(value, max_partition_bytes_);
}
- logger_->log_info("NiFi Provenance Max Partition Bytes %d",
- max_partition_bytes_);
- if (config->get(Configure::nifi_provenance_repository_max_storage_time,
- value)) {
+ logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
+ if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, max_partition_millis_, unit)
- && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit,
- max_partition_millis_)) {
+ if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
}
}
- logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms",
- max_partition_millis_);
+ logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
leveldb::Options options;
options.create_if_missing = true;
- leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(),
- &db_);
+ leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), &db_);
if (status.ok()) {
- logger_->log_info("NiFi Provenance Repository database open %s success",
- directory_.c_str());
+ logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str());
} else {
- logger_->log_error("NiFi Provenance Repository database open %s fail",
- directory_.c_str());
+ logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str());
return false;
}
@@ -115,8 +100,8 @@ class ProvenanceRepository : public core::Repository,
// Put
virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
- if (repo_full_)
- return false;
+ if (repo_full_)
+ return false;
// persistent to the DB
leveldb::Slice value((const char *) buf, bufLen);
@@ -147,40 +132,33 @@ class ProvenanceRepository : public core::Repository,
}
// Persistent event
void registerEvent(std::shared_ptr<ProvenanceEventRecord> &event) {
- event->Serialize(
- std::static_pointer_cast<core::Repository>(shared_from_this()));
+ event->Serialize(std::static_pointer_cast<core::Repository>(shared_from_this()));
}
// Remove event
void removeEvent(ProvenanceEventRecord *event) {
Delete(event->getEventId());
}
//! get record
- void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize)
- {
- std::lock_guard<std::mutex> lock(mutex_);
- leveldb::Iterator* it = db_->NewIterator(
- leveldb::ReadOptions());
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
- std::string key = it->key().ToString();
- if (records.size() >= maxSize)
- break;
- if (eventRead->DeSerialize((uint8_t *) it->value().data(),
- (int) it->value().size()))
- {
- records.push_back(eventRead);
- }
- }
- delete it;
+ void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+ std::string key = it->key().ToString();
+ if (records.size() >= maxSize)
+ break;
+ if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) {
+ records.push_back(eventRead);
+ }
+ }
+ delete it;
}
//! purge record
- void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records)
- {
- std::lock_guard<std::mutex> lock(mutex_);
- for (auto record : records)
- {
- Delete(record->getEventId());
- }
+ void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto record : records) {
+ Delete(record->getEventId());
+ }
}
// destroy
void destroy() {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/ByteInputCallBack.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ByteInputCallBack.h b/libminifi/include/utils/ByteInputCallBack.h
index 72303ff..a2b7838 100644
--- a/libminifi/include/utils/ByteInputCallBack.h
+++ b/libminifi/include/utils/ByteInputCallBack.h
@@ -41,8 +41,7 @@ class ByteInputCallBack : public InputStreamCallback {
virtual void process(std::ifstream *stream) {
- std::vector<char> nv = std::vector<char>(std::istreambuf_iterator<char>(*stream),
- std::istreambuf_iterator<char>());
+ std::vector<char> nv = std::vector<char>(std::istreambuf_iterator<char>(*stream), std::istreambuf_iterator<char>());
vec = std::move(nv);
ptr = &vec[0];
@@ -53,9 +52,6 @@ class ByteInputCallBack : public InputStreamCallback {
return ptr;
}
-
-
-
const size_t getBufferSize() {
return vec.size();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/StringUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 0fc1996..b754467 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -66,11 +66,7 @@ class StringUtils {
* @returns modified string
*/
static inline std::string trimLeft(std::string s) {
- s.erase(
- s.begin(),
- std::find_if(
- s.begin(), s.end(),
- std::not1(std::pointer_to_unary_function<int, int>(std::isspace))));
+ s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))));
return s;
}
@@ -81,15 +77,10 @@ class StringUtils {
*/
static inline std::string trimRight(std::string s) {
- s.erase(
- std::find_if(
- s.rbegin(), s.rend(),
- std::not1(std::pointer_to_unary_function<int, int>(std::isspace)))
- .base(),
- s.end());
+ s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))).base(), s.end());
return s;
}
-
+
static std::vector<std::string> split(const std::string &str, const std::string &delimiter) {
std::vector<std::string> result;
int last = 0;
@@ -108,14 +99,13 @@ class StringUtils {
* @param output output float
* @param cp failure policy
*/
- static bool StringToFloat(std::string input, float &output, FailurePolicy cp =
- RETURN) {
+ static bool StringToFloat(std::string input, float &output, FailurePolicy cp = RETURN) {
try {
output = std::stof(input);
} catch (const std::invalid_argument &ie) {
switch (cp) {
case RETURN:
- case NOTHING:
+ case NOTHING:
return false;
case EXIT:
exit(1);
@@ -125,7 +115,7 @@ class StringUtils {
} catch (const std::out_of_range &ofr) {
switch (cp) {
case RETURN:
- case NOTHING:
+ case NOTHING:
return false;
case EXIT:
exit(1);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 4c399a7..77772cd 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -185,23 +185,23 @@ class ThreadPool {
}
// determines if threads are detached
bool daemon_threads_;
-// max worker threads
+ // max worker threads
int max_worker_threads_;
-// current worker tasks.
+ // current worker tasks.
std::atomic<int> current_workers_;
-// thread queue
+ // thread queue
std::vector<std::thread> thread_queue_;
-// manager thread
+ // manager thread
std::thread manager_thread_;
-// atomic running boolean
+ // atomic running boolean
std::atomic<bool> running_;
-// worker queue of worker objects
+ // worker queue of worker objects
moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
-// notification for available work
+ // notification for available work
std::condition_variable tasks_available_;
-// manager mutex
+ // manager mutex
std::recursive_mutex manager_mutex_;
-// work queue mutex
+ // work queue mutex
std::mutex worker_queue_mutex_;
/**
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/TimeUtil.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index 6805419..19c2566 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -33,8 +33,7 @@
* @returns milliseconds since epoch
*/
inline uint64_t getTimeMillis() {
- return std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch()).count();
+ return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
/**
@@ -43,8 +42,7 @@ inline uint64_t getTimeMillis() {
*/
inline uint64_t getTimeNano() {
- return std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::system_clock::now().time_since_epoch()).count();
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
@@ -58,8 +56,7 @@ inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) {
char date[120];
time_t second = (time_t) (msec / 1000);
msec = msec % 1000;
- strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT,
- (enforce_locale == true ? gmtime(&second) : localtime(&second)));
+ strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT, (enforce_locale == true ? gmtime(&second) : localtime(&second)));
std::string ret = date;
date[0] = '\0';
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 5c62a8d..d8e049c 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -23,50 +23,31 @@ namespace nifi {
namespace minifi {
const char *Configure::nifi_default_directory = "nifi.default.directory";
-const char *Configure::nifi_flow_configuration_file =
- "nifi.flow.configuration.file";
+const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file";
const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
-const char *Configure::nifi_administrative_yield_duration =
- "nifi.administrative.yield.duration";
+const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
-const char *Configure::nifi_graceful_shutdown_seconds =
- "nifi.flowcontroller.graceful.shutdown.period";
+const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
const char *Configure::nifi_log_level = "nifi.log.level";
const char *Configure::nifi_server_name = "nifi.server.name";
-const char *Configure::nifi_configuration_class_name =
- "nifi.flow.configuration.class.name";
-const char *Configure::nifi_flow_repository_class_name =
- "nifi.flow.repository.class.name";
-const char *Configure::nifi_volatile_repository_options =
- "nifi.volatile.repository.options.";
-const char *Configure::nifi_provenance_repository_class_name =
- "nifi.provenance.repository.class.name";
+const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name";
+const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name";
+const char *Configure::nifi_volatile_repository_options = "nifi.volatile.repository.options.";
+const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
const char *Configure::nifi_server_port = "nifi.server.port";
-const char *Configure::nifi_server_report_interval =
- "nifi.server.report.interval";
-const char *Configure::nifi_provenance_repository_max_storage_size =
- "nifi.provenance.repository.max.storage.size";
-const char *Configure::nifi_provenance_repository_max_storage_time =
- "nifi.provenance.repository.max.storage.time";
-const char *Configure::nifi_provenance_repository_directory_default =
- "nifi.provenance.repository.directory.default";
-const char *Configure::nifi_flowfile_repository_max_storage_size =
- "nifi.flowfile.repository.max.storage.size";
-const char *Configure::nifi_flowfile_repository_max_storage_time =
- "nifi.flowfile.repository.max.storage.time";
-const char *Configure::nifi_flowfile_repository_directory_default =
- "nifi.flowfile.repository.directory.default";
+const char *Configure::nifi_server_report_interval = "nifi.server.report.interval";
+const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
+const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
+const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
+const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
+const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
+const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_security_need_ClientAuth =
- "nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate =
- "nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key =
- "nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase =
- "nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate =
- "nifi.security.client.ca.certificate";
+const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
+const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
+const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key";
+const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
+const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index bc76044..0901a30 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -39,9 +39,7 @@ namespace apache {
namespace nifi {
namespace minifi {
-Connection::Connection(std::shared_ptr<core::Repository> flow_repository,
- std::string name, uuid_t uuid, uuid_t srcUUID,
- uuid_t destUUID)
+Connection::Connection(std::shared_ptr<core::Repository> flow_repository, std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
: core::Connectable(name, uuid),
flow_repository_(flow_repository),
logger_(logging::LoggerFactory<Connection>::getLogger()) {
@@ -91,8 +89,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) {
queued_data_size_ += flow->getSize();
- logger_->log_debug("Enqueue flow file UUID %s to connection %s",
- flow->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Enqueue flow file UUID %s to connection %s", flow->getUUIDStr().c_str(), name_.c_str());
}
if (!flow->isStored()) {
@@ -109,8 +106,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) {
}
}
-std::shared_ptr<core::FlowFile> Connection::poll(
- std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
+std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
std::lock_guard<std::mutex> lock(mutex_);
while (!queue_.empty()) {
@@ -134,11 +130,9 @@ std::shared_ptr<core::FlowFile> Connection::poll(
queued_data_size_ += item->getSize();
break;
}
- std::shared_ptr<Connectable> connectable = std::static_pointer_cast<
- Connectable>(shared_from_this());
+ std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
item->setOriginalConnection(connectable);
- logger_->log_debug("Dequeue flow file UUID %s from connection %s",
- item->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str());
// delete from the flowfile repo
if (flow_repository_->Delete(item->getUUIDStr())) {
@@ -155,11 +149,9 @@ std::shared_ptr<core::FlowFile> Connection::poll(
queued_data_size_ += item->getSize();
break;
}
- std::shared_ptr<Connectable> connectable = std::static_pointer_cast<
- Connectable>(shared_from_this());
+ std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
item->setOriginalConnection(connectable);
- logger_->log_debug("Dequeue flow file UUID %s from connection %s",
- item->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str());
// delete from the flowfile repo
if (flow_repository_->Delete(item->getUUIDStr())) {
item->setStoredToRepository(false);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index fa2171b..8a2a874 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -32,22 +32,16 @@ namespace apache {
namespace nifi {
namespace minifi {
-void EventDrivenSchedulingAgent::run(
- std::shared_ptr<core::Processor> processor,
- core::ProcessContext *processContext,
- core::ProcessSessionFactory *sessionFactory) {
+void EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
while (this->running_) {
- bool shouldYield = this->onTrigger(processor, processContext,
- sessionFactory);
+ bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
// Honor the yield
- std::this_thread::sleep_for(
- std::chrono::milliseconds(processor->getYieldTime()));
+ std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
} else if (shouldYield && this->bored_yield_duration_ > 0) {
// No work to do or need to apply back pressure
- std::this_thread::sleep_for(
- std::chrono::milliseconds(this->bored_yield_duration_));
+ std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
}
// Block until work is available
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 69f482f..dbe27e8 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -65,18 +65,18 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
- logger_->log_error("setsockopt() SO_REUSEADDR failed");
- close(sock);
- return 0;
- }
- }
+ logger_->log_error("setsockopt() SO_REUSEADDR failed");
+ close(sock);
+ return 0;
+ }
+ }
- int sndsize = 256*1024;
- if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) {
- logger_->log_error("setsockopt() SO_SNDBUF failed");
- close(sock);
- return 0;
- }
+ int sndsize = 256*1024;
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) {
+ logger_->log_error("setsockopt() SO_SNDBUF failed");
+ close(sock);
+ return 0;
+ }
#endif
struct sockaddr_in sa;
@@ -108,9 +108,7 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
return 0;
}
- logger_->log_info(
- "Flow Control Protocol socket %d connect to server %s port %d success",
- sock, host, port);
+ logger_->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port);
return sock;
}
@@ -224,8 +222,7 @@ void FlowControlProtocol::stop() {
void FlowControlProtocol::run(FlowControlProtocol *protocol) {
while (protocol->running_) {
- std::this_thread::sleep_for(
- std::chrono::milliseconds(protocol->_reportInterval));
+ std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
if (!protocol->_registered) {
// if it is not register yet
protocol->sendRegisterReq();
@@ -251,9 +248,7 @@ int FlowControlProtocol::sendRegisterReq() {
return -1;
// Calculate the total payload msg size
- uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0)
- + FlowControlMsgIDEncodingLen(FLOW_YML_NAME,
- this->_controller->getName().size() + 1);
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size() + 1);
uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
uint8_t *data = new uint8_t[size];
@@ -294,17 +289,13 @@ int FlowControlProtocol::sendRegisterReq() {
if (status <= 0) {
close(_socket);
_socket = 0;
- logger_->log_error(
- "Flow Control Protocol Read Register Resp header failed");
+ logger_->log_error("Flow Control Protocol Read Register Resp header failed");
return -1;
}
- logger_->log_info("Flow Control Protocol receive MsgType %s",
- FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
- logger_->log_info("Flow Control Protocol receive Resp Code %s",
- FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
- logger_->log_info("Flow Control Protocol receive Payload len %d",
- hdr.payloadLen);
+ logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) {
this->_registered = true;
@@ -327,8 +318,7 @@ int FlowControlProtocol::sendRegisterReq() {
// Fixed 4 bytes
uint32_t reportInterval;
payloadPtr = this->decode(payloadPtr, reportInterval);
- logger_->log_info("Flow Control Protocol receive report interval %d ms",
- reportInterval);
+ logger_->log_info("Flow Control Protocol receive report interval %d ms", reportInterval);
this->_reportInterval = reportInterval;
} else {
break;
@@ -356,8 +346,7 @@ int FlowControlProtocol::sendReportReq() {
return -1;
// Calculate the total payload msg size
- uint32_t payloadSize = FlowControlMsgIDEncodingLen(
- FLOW_YML_NAME, this->_controller->getName().size() + 1);
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size() + 1);
uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
uint8_t *data = new uint8_t[size];
@@ -397,13 +386,10 @@ int FlowControlProtocol::sendReportReq() {
logger_->log_error("Flow Control Protocol Read Report Resp header failed");
return -1;
}
- logger_->log_info("Flow Control Protocol receive MsgType %s",
- FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
- logger_->log_info("Flow Control Protocol receive Resp Code %s",
- FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
- logger_->log_info("Flow Control Protocol receive Payload len %d",
- hdr.payloadLen);
+ logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) {
this->_seqNumber++;
@@ -428,27 +414,20 @@ int FlowControlProtocol::sendReportReq() {
payloadPtr = this->decode(payloadPtr, len);
processor = (const char *) payloadPtr;
payloadPtr += len;
- logger_->log_info(
- "Flow Control Protocol receive report resp processor %s",
- processor.c_str());
+ logger_->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str());
} else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) {
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
propertyName = (const char *) payloadPtr;
payloadPtr += len;
- logger_->log_info(
- "Flow Control Protocol receive report resp property name %s",
- propertyName.c_str());
+ logger_->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str());
} else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) {
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
propertyValue = (const char *) payloadPtr;
payloadPtr += len;
- logger_->log_info(
- "Flow Control Protocol receive report resp property value %s",
- propertyValue.c_str());
- this->_controller->updatePropertyValue(processor, propertyName,
- propertyValue);
+ logger_->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str());
+ this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
} else {
break;
}
@@ -457,24 +436,21 @@ int FlowControlProtocol::sendReportReq() {
close(_socket);
_socket = 0;
return 0;
- } else if (hdr.status == RESP_TRIGGER_REGISTER
- && hdr.seqNumber == this->_seqNumber) {
+ } else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) {
logger_->log_info("Flow Control Protocol trigger reregister");
this->_registered = false;
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
- } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER
- && hdr.seqNumber == this->_seqNumber) {
+ } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) {
logger_->log_info("Flow Control Protocol stop flow controller");
this->_controller->stop(true);
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
- } else if (hdr.status == RESP_START_FLOW_CONTROLLER
- && hdr.seqNumber == this->_seqNumber) {
+ } else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) {
logger_->log_info("Flow Control Protocol start flow controller");
this->_controller->start();
this->_seqNumber++;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index c7df2e7..62cf21c 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -48,14 +48,9 @@ namespace minifi {
#define DEFAULT_CONFIG_NAME "conf/flow.yml"
-FlowController::FlowController(
- std::shared_ptr<core::Repository> provenance_repo,
- std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<Configure> configure,
- std::unique_ptr<core::FlowConfiguration> flow_configuration,
- const std::string name, bool headless_mode)
- : core::controller::ControllerServiceProvider(
- core::getClassName<FlowController>()),
+FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
+ std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name, bool headless_mode)
+ : core::controller::ControllerServiceProvider(core::getClassName<FlowController>()),
root_(nullptr),
max_timer_driven_threads_(0),
max_event_driven_threads_(0),
@@ -64,14 +59,13 @@ FlowController::FlowController(
provenance_repo_(provenance_repo),
flow_file_repo_(flow_file_repo),
protocol_(0),
- controller_service_map_(
- std::make_shared<core::controller::ControllerServiceMap>()),
+ controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
timer_scheduler_(nullptr),
event_scheduler_(nullptr),
controller_service_provider_(nullptr),
flow_configuration_(std::move(flow_configuration)),
configuration_(configure),
- logger_(logging::LoggerFactory<FlowController>::getLogger()) {
+ logger_(logging::LoggerFactory<FlowController>::getLogger()) {
if (provenance_repo == nullptr)
throw std::runtime_error("Provenance Repo should not be null");
if (flow_file_repo == nullptr)
@@ -96,8 +90,7 @@ FlowController::FlowController(
if (!headless_mode) {
std::string rawConfigFileString;
- configure->get(Configure::nifi_flow_configuration_file,
- rawConfigFileString);
+ configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
if (!rawConfigFileString.empty()) {
configuration_filename_ = rawConfigFileString;
@@ -107,8 +100,7 @@ FlowController::FlowController(
if (!configuration_filename_.empty()) {
// perform a naive determination if this is a relative path
if (configuration_filename_.c_str()[0] != '/') {
- adjustedFilename = adjustedFilename + configure->getHome() + "/"
- + configuration_filename_;
+ adjustedFilename = adjustedFilename + configure->getHome() + "/" + configuration_filename_;
} else {
adjustedFilename = configuration_filename_;
}
@@ -124,19 +116,16 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
path = realpath(adjustedFilename.c_str(), full_path);
if (path == NULL) {
- throw std::runtime_error(
- "Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
+ throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
}
std::string pathString(path);
configuration_filename_ = pathString;
- logger_->log_info("FlowController NiFi Configuration file %s",
- pathString.c_str());
+ logger_->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
// Create the content repo directory if needed
struct stat contentDirStat;
- if (stat(ResourceClaim::default_directory_path, &contentDirStat)
- != -1&& S_ISDIR(contentDirStat.st_mode)) {
+ if (stat(ResourceClaim::default_directory_path, &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) {
path = realpath(ResourceClaim::default_directory_path, full_path);
logger_->log_info("FlowController content directory %s", full_path);
} else {
@@ -149,9 +138,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
std::string clientAuthStr;
if (!path) {
- logger_->log_error(
- "Could not locate path from provided configuration file name (%s). Exiting.",
- full_path);
+ logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path);
exit(1);
}
}
@@ -179,8 +166,7 @@ void FlowController::stop(bool force) {
// Wait for sometime for thread stop
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (this->root_)
- this->root_->stopProcessing(this->timer_scheduler_.get(),
- this->event_scheduler_.get());
+ this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
}
}
@@ -196,13 +182,10 @@ void FlowController::stop(bool force) {
void FlowController::waitUnload(const uint64_t timeToWaitMs) {
if (running_) {
// use the current time and increment with the provided argument.
- std::chrono::system_clock::time_point wait_time =
- std::chrono::system_clock::now()
- + std::chrono::milliseconds(timeToWaitMs);
+ std::chrono::system_clock::time_point wait_time = std::chrono::system_clock::now() + std::chrono::milliseconds(timeToWaitMs);
// create an asynchronous future.
- std::future<void> unload_task = std::async(std::launch::async,
- [this]() {unload();});
+ std::future<void> unload_task = std::async(std::launch::async, [this]() {unload();});
if (std::future_status::ready == unload_task.wait_until(wait_time)) {
running_ = false;
@@ -233,32 +216,21 @@ void FlowController::load() {
if (!initialized_) {
logger_->log_info("Initializing timers");
if (nullptr == timer_scheduler_) {
- timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
- std::static_pointer_cast<core::controller::ControllerServiceProvider>(
- shared_from_this()),
- provenance_repo_, configuration_);
+ timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_);
}
if (nullptr == event_scheduler_) {
- event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(
- std::static_pointer_cast<core::controller::ControllerServiceProvider>(
- shared_from_this()),
- provenance_repo_, configuration_);
+ event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_);
}
- logger_->log_info("Load Flow Controller from file %s",
- configuration_filename_.c_str());
+ logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str());
- this->root_ = std::shared_ptr<core::ProcessGroup>(
- flow_configuration_->getRoot(configuration_filename_));
+ this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
logger_->log_info("Loaded root processor Group");
- controller_service_provider_ = flow_configuration_
- ->getControllerServiceProvider();
+ controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
- std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(
- controller_service_provider_)->setRootGroup(root_);
- std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(
- controller_service_provider_)->setSchedulingAgent(
+ std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
+ std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
logger_->log_info("Loaded controller service provider");
@@ -271,8 +243,7 @@ void FlowController::load() {
void FlowController::reload(std::string yamlFile) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
- logger_->log_info("Starting to reload Flow Controller with yaml %s",
- yamlFile.c_str());
+ logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
stop(true);
unload();
std::string oldYamlFile = this->configuration_filename_;
@@ -281,8 +252,7 @@ void FlowController::reload(std::string yamlFile) {
start();
if (this->root_ != nullptr) {
this->configuration_filename_ = oldYamlFile;
- logger_->log_info("Rollback Flow Controller to YAML %s",
- oldYamlFile.c_str());
+ logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str());
stop(true);
unload();
load();
@@ -297,10 +267,8 @@ void FlowController::loadFlowRepo() {
if (this->root_ != nullptr) {
this->root_->getConnections(connectionMap);
}
- logger_->log_debug("Number of connections from connectionMap %d",
- connectionMap.size());
- auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(
- flow_file_repo_);
+ logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size());
+ auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_);
if (nullptr != rep) {
rep->setConnectionMap(connectionMap);
}
@@ -313,8 +281,7 @@ void FlowController::loadFlowRepo() {
bool FlowController::start() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (!initialized_) {
- logger_->log_error(
- "Can not start Flow Controller because it has not been initialized");
+ logger_->log_error("Can not start Flow Controller because it has not been initialized");
return false;
} else {
if (!running_) {
@@ -323,8 +290,7 @@ bool FlowController::start() {
this->timer_scheduler_->start();
this->event_scheduler_->start();
if (this->root_ != nullptr) {
- this->root_->startProcessing(this->timer_scheduler_.get(),
- this->event_scheduler_.get());
+ this->root_->startProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
}
running_ = true;
this->protocol_->start();
@@ -346,11 +312,9 @@ bool FlowController::start() {
* @param id service identifier
* @param firstTimeAdded first time this CS was added
*/
-std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(
- const std::string &type, const std::string &id,
- bool firstTimeAdded) {
- return controller_service_provider_->createControllerService(type, id,
- firstTimeAdded);
+std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &id,
+bool firstTimeAdded) {
+ return controller_service_provider_->createControllerService(type, id, firstTimeAdded);
}
/**
@@ -361,8 +325,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createC
* @param serviceNode service node to be removed.
*/
-void FlowController::removeControllerService(
- const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+void FlowController::removeControllerService(const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
controller_map_->removeControllerService(serviceNode);
}
@@ -370,8 +333,7 @@ void FlowController::removeControllerService(
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
-void FlowController::enableControllerService(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+void FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->enableControllerService(serviceNode);
}
@@ -379,16 +341,14 @@ void FlowController::enableControllerService(
* Enables controller services
* @param serviceNoden vector of service nodes which will be enabled, along with linked services.
*/
-void FlowController::enableControllerServices(
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) {
+void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) {
}
/**
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
-void FlowController::disableControllerService(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+void FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
controller_service_provider_->disableControllerService(serviceNode);
}
@@ -404,40 +364,33 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it does not exist.
*/
-std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(
- const std::string &id) {
+std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(const std::string &id) {
return controller_service_provider_->getControllerServiceNode(id);
}
-void FlowController::verifyCanStopReferencingComponents(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+void FlowController::verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
}
/**
* Unschedules referencing components.
*/
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- return controller_service_provider_->unscheduleReferencingComponents(
- serviceNode);
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+ return controller_service_provider_->unscheduleReferencingComponents(serviceNode);
}
/**
* Verify can disable referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
-void FlowController::verifyCanDisableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- controller_service_provider_->verifyCanDisableReferencingServices(
- serviceNode);
+void FlowController::verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+ controller_service_provider_->verifyCanDisableReferencingServices(serviceNode);
}
/**
* Disables referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->disableReferencingServices(serviceNode);
}
@@ -445,8 +398,7 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro
* Verify can enable referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
-void FlowController::verifyCanEnableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+void FlowController::verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
controller_service_provider_->verifyCanEnableReferencingServices(serviceNode);
}
@@ -461,8 +413,7 @@ bool FlowController::isControllerServiceEnabled(const std::string &identifier) {
* Enables referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->enableReferencingServices(serviceNode);
}
@@ -470,20 +421,16 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro
* Schedules referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
- return controller_service_provider_->scheduleReferencingComponents(
- serviceNode);
+std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+ return controller_service_provider_->scheduleReferencingComponents(serviceNode);
}
/**
* Returns controller service components referenced by serviceIdentifier from the embedded
* controller service provider;
*/
-std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(
- const std::string &serviceIdentifier, const std::string &componentId) {
- return controller_service_provider_->getControllerServiceForComponent(
- serviceIdentifier, componentId);
+std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId) {
+ return controller_service_provider_->getControllerServiceForComponent(serviceIdentifier, componentId);
}
/**