You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:12 UTC
[9/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with
increased line length to source
MINIFI-331: Apply formatter with increased line length to source
This closes #111.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/77a20dbe
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/77a20dbe
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/77a20dbe
Branch: refs/heads/master
Commit: 77a20dbe3078d5bccee7bba0d0b4f048f5440420
Parents: feec4ea
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Jun 6 09:22:05 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Jun 6 12:32:22 2017 -0400
----------------------------------------------------------------------
libminifi/include/Connection.h | 11 +-
libminifi/include/EventDrivenSchedulingAgent.h | 15 +-
libminifi/include/Exception.h | 5 +-
libminifi/include/FlowControlProtocol.h | 26 +-
libminifi/include/FlowController.h | 54 +--
libminifi/include/FlowFileRecord.h | 17 +-
libminifi/include/RemoteProcessorGroupPort.h | 14 +-
libminifi/include/SchedulingAgent.h | 17 +-
libminifi/include/Site2SiteClientProtocol.h | 161 ++++----
libminifi/include/Site2SitePeer.h | 17 +-
libminifi/include/ThreadedSchedulingAgent.h | 12 +-
libminifi/include/TimerDrivenSchedulingAgent.h | 12 +-
.../include/controllers/SSLContextService.h | 74 ++--
libminifi/include/core/ClassLoader.h | 23 +-
libminifi/include/core/ConfigurationFactory.h | 26 +-
libminifi/include/core/Connectable.h | 7 +-
libminifi/include/core/Core.h | 6 +-
libminifi/include/core/FlowConfiguration.h | 31 +-
libminifi/include/core/FlowFile.h | 4 +-
libminifi/include/core/ProcessContext.h | 14 +-
libminifi/include/core/ProcessGroup.h | 22 +-
libminifi/include/core/ProcessSession.h | 107 +++--
libminifi/include/core/Processor.h | 14 +-
libminifi/include/core/ProcessorNode.h | 15 +-
libminifi/include/core/Property.h | 24 +-
libminifi/include/core/Repository.h | 9 +-
libminifi/include/core/RepositoryFactory.h | 3 +-
libminifi/include/core/Resource.h | 3 +-
.../core/controller/ControllerServiceLookup.h | 6 +-
.../core/controller/ControllerServiceMap.h | 12 +-
.../core/controller/ControllerServiceNode.h | 6 +-
.../core/controller/ControllerServiceProvider.h | 110 ++----
.../controller/StandardControllerServiceNode.h | 15 +-
.../StandardControllerServiceProvider.h | 90 ++---
libminifi/include/core/logging/Logger.h | 40 +-
.../include/core/logging/LoggerConfiguration.h | 63 +--
.../SiteToSiteProvenanceReportingTask.h | 20 +-
.../core/repository/FlowFileRepository.h | 46 +--
.../core/repository/VolatileRepository.h | 41 +-
libminifi/include/core/yaml/YamlConfiguration.h | 59 +--
libminifi/include/io/BaseStream.h | 20 +-
libminifi/include/io/CRCStream.h | 50 +--
libminifi/include/io/ClientSocket.h | 26 +-
libminifi/include/io/DataStream.h | 9 +-
libminifi/include/io/EndianCheck.h | 2 +-
libminifi/include/io/Serializable.h | 18 +-
libminifi/include/io/StreamFactory.h | 6 +-
libminifi/include/io/TLSSocket.h | 12 +-
libminifi/include/io/tls/TLSSocket.h | 49 +--
libminifi/include/io/validation.h | 12 +-
libminifi/include/processors/AppendHostInfo.h | 6 +-
libminifi/include/processors/ExecuteProcess.h | 8 +-
libminifi/include/processors/GenerateFlowFile.h | 3 +-
libminifi/include/processors/GetFile.h | 22 +-
libminifi/include/processors/InvokeHTTP.h | 19 +-
libminifi/include/processors/ListenHTTP.h | 14 +-
libminifi/include/processors/ListenSyslog.h | 9 +-
libminifi/include/processors/LoadProcessors.h | 1 -
libminifi/include/processors/LogAttribute.h | 3 +-
libminifi/include/processors/PutFile.h | 19 +-
libminifi/include/processors/TailFile.h | 6 +-
libminifi/include/properties/Properties.h | 4 +-
libminifi/include/provenance/Provenance.h | 73 ++--
.../include/provenance/ProvenanceRepository.h | 114 +++---
libminifi/include/utils/ByteInputCallBack.h | 6 +-
libminifi/include/utils/StringUtils.h | 22 +-
libminifi/include/utils/ThreadPool.h | 18 +-
libminifi/include/utils/TimeUtil.h | 9 +-
libminifi/src/Configure.cpp | 57 +--
libminifi/src/Connection.cpp | 22 +-
libminifi/src/EventDrivenSchedulingAgent.cpp | 14 +-
libminifi/src/FlowControlProtocol.cpp | 84 ++--
libminifi/src/FlowController.cpp | 147 +++----
libminifi/src/FlowFileRecord.cpp | 43 +-
libminifi/src/Properties.cpp | 13 +-
libminifi/src/RemoteProcessorGroupPort.cpp | 45 +--
libminifi/src/SchedulingAgent.cpp | 21 +-
libminifi/src/Site2SiteClientProtocol.cpp | 396 +++++++------------
libminifi/src/Site2SitePeer.cpp | 4 +-
libminifi/src/ThreadedSchedulingAgent.cpp | 70 ++--
libminifi/src/TimerDrivenSchedulingAgent.cpp | 17 +-
libminifi/src/controllers/SSLContextService.cpp | 41 +-
libminifi/src/core/ClassLoader.cpp | 10 +-
libminifi/src/core/ConfigurableComponent.cpp | 33 +-
libminifi/src/core/ConfigurationFactory.cpp | 36 +-
libminifi/src/core/Connectable.cpp | 36 +-
libminifi/src/core/FlowConfiguration.cpp | 32 +-
libminifi/src/core/FlowFile.cpp | 3 +-
libminifi/src/core/ProcessGroup.cpp | 68 ++--
libminifi/src/core/ProcessSession.cpp | 296 +++++---------
libminifi/src/core/Processor.cpp | 61 +--
libminifi/src/core/ProcessorNode.cpp | 3 +-
libminifi/src/core/RepositoryFactory.cpp | 18 +-
.../core/controller/ControllerServiceNode.cpp | 2 -
.../controller/ControllerServiceProvider.cpp | 3 +-
.../StandardControllerServiceNode.cpp | 9 +-
.../src/core/logging/LoggerConfiguration.cpp | 31 +-
.../SiteToSiteProvenanceReportingTask.cpp | 35 +-
.../src/core/repository/FlowFileRepository.cpp | 29 +-
.../src/core/repository/VolatileRepository.cpp | 7 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 255 ++++--------
libminifi/src/io/BaseStream.cpp | 47 +--
libminifi/src/io/ClientSocket.cpp | 45 +--
libminifi/src/io/DataStream.cpp | 12 +-
libminifi/src/io/Serializable.cpp | 33 +-
libminifi/src/io/tls/TLSSocket.cpp | 70 ++--
libminifi/src/processors/AppendHostInfo.cpp | 27 +-
libminifi/src/processors/ExecuteProcess.cpp | 83 ++--
libminifi/src/processors/GenerateFlowFile.cpp | 31 +-
libminifi/src/processors/GetFile.cpp | 105 ++---
libminifi/src/processors/InvokeHTTP.cpp | 265 ++++---------
libminifi/src/processors/ListenHTTP.cpp | 120 ++----
libminifi/src/processors/ListenSyslog.cpp | 68 +---
libminifi/src/processors/LogAttribute.cpp | 41 +-
libminifi/src/processors/PutFile.cpp | 62 +--
libminifi/src/processors/TailFile.cpp | 60 +--
libminifi/src/provenance/Provenance.cpp | 109 ++---
.../src/provenance/ProvenanceRepository.cpp | 10 +-
libminifi/test/Server.cpp | 58 +--
libminifi/test/TestBase.h | 67 ++--
.../ControllerServiceIntegrationTests.cpp | 91 ++---
.../test/integration/HttpGetIntegrationTest.cpp | 46 +--
.../integration/HttpPostIntegrationTest.cpp | 41 +-
.../integration/ProvenanceReportingTest.cpp | 48 +--
.../test/integration/TestExecuteProcess.cpp | 56 +--
libminifi/test/nodefs/NoLevelDB.cpp | 6 +-
libminifi/test/unit/CRCTests.cpp | 15 +-
libminifi/test/unit/ClassLoaderTests.cpp | 20 +-
libminifi/test/unit/ControllerServiceTests.cpp | 26 +-
libminifi/test/unit/InvokeHTTPTests.cpp | 120 ++----
.../test/unit/LoggerConfigurationTests.cpp | 4 +-
libminifi/test/unit/LoggerTests.cpp | 30 +-
libminifi/test/unit/MockClasses.h | 11 +-
libminifi/test/unit/ProcessorTests.cpp | 93 ++---
libminifi/test/unit/PropertyTests.cpp | 24 +-
libminifi/test/unit/ProvenanceTestHelper.h | 43 +-
libminifi/test/unit/ProvenanceTests.cpp | 52 +--
libminifi/test/unit/RepoTests.cpp | 15 +-
libminifi/test/unit/Site2SiteTests.cpp | 36 +-
libminifi/test/unit/SiteToSiteHelper.h | 9 +-
libminifi/test/unit/StringUtilsTests.cpp | 11 +-
libminifi/test/unit/YamlConfigurationTests.cpp | 15 +-
142 files changed, 1996 insertions(+), 3881 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 8fe42d0..be51fce 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -41,17 +41,13 @@ namespace nifi {
namespace minifi {
// Connection Class
-class Connection : public core::Connectable,
- public std::enable_shared_from_this<Connection> {
+class Connection : public core::Connectable, public std::enable_shared_from_this<Connection> {
public:
// Constructor
/*
* Create a new processor
*/
- explicit Connection(std::shared_ptr<core::Repository> flow_repository,
- std::string name, uuid_t uuid = NULL, uuid_t srcUUID =
- NULL,
- uuid_t destUUID = NULL);
+ explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL);
// Destructor
virtual ~Connection() {
}
@@ -137,8 +133,7 @@ class Connection : public core::Connectable,
// Put the flow file into queue
void put(std::shared_ptr<core::FlowFile> flow);
// Poll the flow file from queue, the expired flow file record also being returned
- std::shared_ptr<core::FlowFile> poll(
- std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
// Drain the flow records
void drain();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index 2e49ddf..6a63dc5 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -38,27 +38,20 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
/*!
* Create a new event driven scheduling agent.
*/
- EventDrivenSchedulingAgent(
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider,
- std::shared_ptr<core::Repository> repo,
- std::shared_ptr<Configure> configuration)
- : ThreadedSchedulingAgent(controller_service_provider, repo,
- configuration) {
+ EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration)
+ : ThreadedSchedulingAgent(controller_service_provider, repo, configuration) {
}
// Destructor
virtual ~EventDrivenSchedulingAgent() {
}
// Run function for the thread
- void run(std::shared_ptr<core::Processor> processor,
- core::ProcessContext *processContext,
- core::ProcessSessionFactory *sessionFactory);
+ void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
private:
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
- EventDrivenSchedulingAgent &operator=(
- const EventDrivenSchedulingAgent &parent);
+ EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Exception.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
index 88a3ed2..080d1bf 100644
--- a/libminifi/include/Exception.h
+++ b/libminifi/include/Exception.h
@@ -44,9 +44,8 @@ enum ExceptionType {
};
// Exception String
-static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation",
- "Flow File Operation", "Processor Operation", "Process Session Operation",
- "Process Schedule Operation", "Site2Site Protocol", "General Operation" };
+static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", "Flow File Operation", "Processor Operation", "Process Session Operation", "Process Schedule Operation", "Site2Site Protocol",
+ "General Operation" };
// Exception Type to String
inline const char *ExceptionTypeToString(ExceptionType type) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index 73399ae..8992049 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -58,8 +58,7 @@ typedef enum {
} FlowControlMsgType;
// FlowControl Protocol Msg Type String
-static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = {
- "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
// Flow Control Msg Type to String
inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) {
@@ -91,10 +90,8 @@ typedef enum {
} FlowControlMsgID;
// FlowControl Protocol Msg ID String
-static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = {
- "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT",
- "REPORT_INTERVAL", "PROCESSOR_NAME"
- "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME"
+ "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
#define TYPE_HDR_LEN 4 // Fix Hdr Type
#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
@@ -130,9 +127,7 @@ typedef enum {
} FlowControlRespCode;
// FlowControl Resp Code str
-static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS",
- "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER",
- "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
// Flow Control Resp Code to String
inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) {
@@ -157,7 +152,8 @@ class FlowControlProtocol {
/*!
* Create a new control protocol
*/
- FlowControlProtocol(FlowController *controller, const std::shared_ptr<Configure> &configure) : logger_(logging::LoggerFactory<FlowControlProtocol>::getLogger()) {
+ FlowControlProtocol(FlowController *controller, const std::shared_ptr<Configure> &configure)
+ : logger_(logging::LoggerFactory<FlowControlProtocol>::getLogger()) {
_controller = controller;
_socket = 0;
_serverName = "localhost";
@@ -175,17 +171,13 @@ class FlowControlProtocol {
_serverName = value;
logger_->log_info("NiFi Server Name %s", _serverName.c_str());
}
- if (configure->get(Configure::nifi_server_port, value)
- && core::Property::StringToInt(value, _serverPort)) {
+ if (configure->get(Configure::nifi_server_port, value) && core::Property::StringToInt(value, _serverPort)) {
logger_->log_info("NiFi Server Port: [%d]", _serverPort);
}
if (configure->get(Configure::nifi_server_report_interval, value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, _reportInterval, unit)
- && core::Property::ConvertTimeUnitToMS(_reportInterval, unit,
- _reportInterval)) {
- logger_->log_info("NiFi server report interval: [%d] ms",
- _reportInterval);
+ if (core::Property::StringToTime(value, _reportInterval, unit) && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) {
+ logger_->log_info("NiFi server report interval: [%d] ms", _reportInterval);
}
} else
_reportInterval = 0;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 59865d4..dc0d610 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -58,8 +58,7 @@ namespace minifi {
* Flow Controller class. Generally used by FlowController factory
* as a singleton.
*/
-class FlowController : public core::controller::ControllerServiceProvider,
- public std::enable_shared_from_this<FlowController> {
+class FlowController : public core::controller::ControllerServiceProvider, public std::enable_shared_from_this<FlowController> {
public:
static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
@@ -67,9 +66,7 @@ class FlowController : public core::controller::ControllerServiceProvider,
/**
* Flow controller constructor
*/
- FlowController(std::shared_ptr<core::Repository> provenance_repo,
- std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<Configure> configure,
+ FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
std::unique_ptr<core::FlowConfiguration> flow_configuration,
const std::string name = DEFAULT_ROOT_GROUP_NAME,
bool headless_mode = false);
@@ -125,8 +122,7 @@ class FlowController : public core::controller::ControllerServiceProvider,
// Load new xml
virtual void reload(std::string yamlFile);
// update property value
- void updatePropertyValue(std::string processorName, std::string propertyName,
- std::string propertyValue) {
+ void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
if (root_ != nullptr)
root_->updatePropertyValue(processorName, propertyName, propertyValue);
}
@@ -142,9 +138,8 @@ class FlowController : public core::controller::ControllerServiceProvider,
* @param id service identifier
* @param firstTimeAdded first time this CS was added
*/
- virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(
- const std::string &type, const std::string &id,
- bool firstTimeAdded);
+ virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &type, const std::string &id,
+ bool firstTimeAdded);
/**
* controller service provider
@@ -154,29 +149,25 @@ class FlowController : public core::controller::ControllerServiceProvider,
* @param serviceNode service node to be removed.
*/
- virtual void removeControllerService(
- const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void removeControllerService(const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual void enableControllerService(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Enables controller services
* @param serviceNoden vector of service nodes which will be enabled, along with linked services.
*/
- virtual void enableControllerServices(
- std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes);
+ virtual void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes);
/**
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual void disableControllerService(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Gets all controller services.
@@ -188,38 +179,32 @@ class FlowController : public core::controller::ControllerServiceProvider,
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it does not exist.
*/
- virtual std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode(
- const std::string &id);
+ virtual std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode(const std::string &id);
- virtual void verifyCanStopReferencingComponents(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Unschedules referencing components.
*/
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Verify can disable referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
- virtual void verifyCanDisableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Disables referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Verify can enable referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
- virtual void verifyCanEnableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Determines if the controller service specified by identifier is enabled.
@@ -230,22 +215,19 @@ class FlowController : public core::controller::ControllerServiceProvider,
* Enables referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Schedules referencing components
* @param serviceNode service node whose referenced components will be scheduled.
*/
- virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Returns controller service components referenced by serviceIdentifier from the embedded
* controller service provider;
*/
- std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent(
- const std::string &serviceIdentifier, const std::string &componentId);
+ std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId);
/**
* Enables all controller services for the provider.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 1d41b60..400cedc 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -67,9 +67,7 @@ enum FlowAttribute {
};
// FlowFile Attribute Key
-static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path",
- "absolute.path", "filename", "uuid", "priority", "mime.type",
- "discard.reason", "alternate.identifier" };
+static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", "absolute.path", "filename", "uuid", "priority", "mime.type", "discard.reason", "alternate.identifier" };
// FlowFile Attribute Enum to Key
inline const char *FlowAttributeKey(FlowAttribute attribute) {
@@ -96,22 +94,17 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable {
/*
* Create a new flow record
*/
- explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository,
- std::map<std::string, std::string> attributes,
- std::shared_ptr<ResourceClaim> claim = nullptr);
+ explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::map<std::string, std::string> attributes, std::shared_ptr<ResourceClaim> claim = nullptr);
- explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository,
- std::shared_ptr<core::FlowFile> &event);
+ explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event);
- explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository,
- std::shared_ptr<core::FlowFile> &event,
- const std::string &uuidConnection);
+ explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection);
explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository)
: FlowFile(),
flow_repository_(flow_repository),
snapshot_(""),
- logger_(logging::LoggerFactory<FlowFileRecord>::getLogger()) {
+ logger_(logging::LoggerFactory<FlowFileRecord>::getLogger()) {
}
// Destructor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 1bdbb38..9f89b07 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -42,9 +42,7 @@ class RemoteProcessorGroupPort : public core::Processor {
/*!
* Create a new processor
*/
- RemoteProcessorGroupPort(
- const std::shared_ptr<io::StreamFactory> &stream_factory,
- std::string name, uuid_t uuid = nullptr)
+ RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, uuid_t uuid = nullptr)
: core::Processor(name, uuid),
direction_(SEND),
transmitting_(false),
@@ -59,19 +57,17 @@ class RemoteProcessorGroupPort : public core::Processor {
}
// Processor Name
- static const std::string ProcessorName;
+ static const char *ProcessorName;
// Supported Properties
static core::Property hostName;
static core::Property port;
static core::Property portUUID;
// Supported Relationships
static core::Relationship relation;
- public:
- void onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory);
+ public:
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
- virtual void onTrigger(core::ProcessContext *context,
- core::ProcessSession *session);
+ virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi RemoteProcessorGroupPort
virtual void initialize(void);
// Set Direction
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 37e26c6..22f79db 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -52,9 +52,7 @@ class SchedulingAgent {
/*!
* Create a new scheduling agent.
*/
- SchedulingAgent(
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider,
- std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration)
+ SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration)
: configure_(configuration),
admin_yield_duration_(0),
bored_yield_duration_(0),
@@ -62,8 +60,7 @@ class SchedulingAgent {
logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()) {
running_ = false;
repo_ = repo;
- utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(
- configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+ utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
component_lifecycle_thread_pool_ = std::move(pool);
component_lifecycle_thread_pool_.start();
}
@@ -72,9 +69,7 @@ class SchedulingAgent {
}
// onTrigger, return whether the yield is need
- bool onTrigger(std::shared_ptr<core::Processor> processor,
- core::ProcessContext *processContext,
- core::ProcessSessionFactory *sessionFactory);
+ bool onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
// Whether agent has work to do
bool hasWorkToDo(std::shared_ptr<core::Processor> processor);
// Whether the outgoing need to be backpressure
@@ -91,10 +86,8 @@ class SchedulingAgent {
}
public:
- virtual void enableControllerService(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
- virtual void disableControllerService(
- std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
// schedule, overwritten by different DrivenSchedulingAgent
virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
// unschedule, overwritten by different DrivenSchedulingAgent
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h
index 6f5a462..209f6b4 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -131,9 +131,7 @@ typedef enum {
} RequestType;
// Request Type Str
-static const char *RequestTypeStr[MAX_REQUEST_TYPE] = {
- "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES",
- "RECEIVE_FLOWFILES", "SHUTDOWN" };
+static const char *RequestTypeStr[MAX_REQUEST_TYPE] = { "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", "RECEIVE_FLOWFILES", "SHUTDOWN" };
// Respond Code
typedef enum {
@@ -171,32 +169,33 @@ typedef enum {
// Respond Code Class
typedef struct {
RespondCode code;
- const char *description;
- bool hasDescription;
+ const char *description;bool hasDescription;
} RespondCodeContext;
// Respond Code Context
-static RespondCodeContext respondCodeContext[] = { { RESERVED,
- "Reserved for Future Use", false },
- { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME,
- "Unknown Property Name", true }, { ILLEGAL_PROPERTY_VALUE,
- "Illegal Property Value", true }, { MISSING_PROPERTY,
- "Missing Property", true }, { CONTINUE_TRANSACTION,
- "Continue Transaction", false }, { FINISH_TRANSACTION,
- "Finish Transaction", false }, { CONFIRM_TRANSACTION,
- "Confirm Transaction", true }, { TRANSACTION_FINISHED,
- "Transaction Finished", false }, {
- TRANSACTION_FINISHED_BUT_DESTINATION_FULL,
- "Transaction Finished But Destination is Full", false }, {
- CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM,
- "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, {
- NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT,
- "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE,
- "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL,
- "Port's Destination is Full", false }, { UNAUTHORIZED,
- "User Not Authorized", true }, { ABORT, "Abort", true }, {
- UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, {
- END_OF_STREAM, "End of Stream", false } };
+static RespondCodeContext respondCodeContext[] = {
+ { RESERVED, "Reserved for Future Use", false },
+ { PROPERTIES_OK, "Properties OK", false },
+ { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true },
+ { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true },
+ { MISSING_PROPERTY, "Missing Property", true },
+ { CONTINUE_TRANSACTION, "Continue Transaction", false },
+ { FINISH_TRANSACTION, "Finish Transaction", false },
+ { CONFIRM_TRANSACTION, "Confirm Transaction", true },
+ { TRANSACTION_FINISHED, "Transaction Finished", false },
+ { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false },
+ { CANCEL_TRANSACTION, "Cancel Transaction", true },
+ { BAD_CHECKSUM, "Bad Checksum", false },
+ { MORE_DATA, "More Data Exists", false },
+ { NO_MORE_DATA, "No More Data Exists", false },
+ { UNKNOWN_PORT, "Unknown Port", false },
+ { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true },
+ { PORTS_DESTINATION_FULL, "Port's Destination is Full", false },
+ { UNAUTHORIZED, "User Not Authorized", true },
+ { ABORT, "Abort", true },
+ { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false },
+ { END_OF_STREAM, "End of Stream", false }
+};
// Respond Code Sequence Pattern
static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
@@ -246,54 +245,52 @@ typedef enum {
// HandShakeProperty Str
static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
-/**
- * Boolean value indicating whether or not the contents of a FlowFile should
- * be GZipped when transferred.
- */
-"GZIP",
-/**
- * The unique identifier of the port to communicate with
- */
-"PORT_IDENTIFIER",
-/**
- * Indicates the number of milliseconds after the request was made that the
- * client will wait for a response. If no response has been received by the
- * time this value expires, the server can move on without attempting to
- * service the request because the client will have already disconnected.
- */
-"REQUEST_EXPIRATION_MILLIS",
-/**
- * The preferred number of FlowFiles that the server should send to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol.
- */
-"BATCH_COUNT",
-/**
- * The preferred number of bytes that the server should send to the client
- * when pulling data. This property was introduced in version 5 of the
- * protocol.
- */
-"BATCH_SIZE",
-/**
- * The preferred amount of time that the server should send data to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol. Value is in milliseconds.
- */
-"BATCH_DURATION" };
+ /**
+ * Boolean value indicating whether or not the contents of a FlowFile should
+ * be GZipped when transferred.
+ */
+ "GZIP",
+ /**
+ * The unique identifier of the port to communicate with
+ */
+ "PORT_IDENTIFIER",
+ /**
+ * Indicates the number of milliseconds after the request was made that the
+ * client will wait for a response. If no response has been received by the
+ * time this value expires, the server can move on without attempting to
+ * service the request because the client will have already disconnected.
+ */
+ "REQUEST_EXPIRATION_MILLIS",
+ /**
+ * The preferred number of FlowFiles that the server should send to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol.
+ */
+ "BATCH_COUNT",
+ /**
+ * The preferred number of bytes that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the
+ * protocol.
+ */
+ "BATCH_SIZE",
+ /**
+ * The preferred amount of time that the server should send data to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol. Value is in milliseconds.
+ */
+ "BATCH_DURATION" };
class Site2SiteClientProtocol;
// Transaction Class
class Transaction {
friend class Site2SiteClientProtocol;
- public:
+ public:
// Constructor
/*!
* Create a new transaction
*/
- explicit Transaction(
- TransferDirection direction,
- org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &stream)
+ explicit Transaction(TransferDirection direction, org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &stream)
: crcStream(std::move(stream)) {
_state = TRANSACTION_STARTED;
_direction = direction;
@@ -375,9 +372,8 @@ class Transaction {
*/
class DataPacket {
public:
- DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
- std::map<std::string, std::string> attributes, std::string &payload) :
- payload_ (payload) {
+ DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, std::map<std::string, std::string> attributes, std::string &payload)
+ : payload_(payload) {
_protocol = protocol;
_size = 0;
_transaction = transaction;
@@ -398,7 +394,8 @@ class Site2SiteClientProtocol {
/*!
* Create a new control protocol
*/
- Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer) : logger_(logging::LoggerFactory<Site2SiteClientProtocol>::getLogger()) {
+ Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer)
+ : logger_(logging::LoggerFactory<Site2SiteClientProtocol>::getLogger()) {
peer_ = std::move(peer);
_batchSize = 0;
_batchCount = 0;
@@ -500,8 +497,7 @@ class Site2SiteClientProtocol {
int writeRespond(RespondCode code, std::string message);
// getRespondCodeContext
RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (unsigned int i = 0;
- i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) {
+ for (unsigned int i = 0; i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) {
if (respondCodeContext[i].code == code) {
return &respondCodeContext[i];
}
@@ -511,16 +507,13 @@ class Site2SiteClientProtocol {
// Creation of a new transaction, return the transaction ID if success,
// Return NULL when any error occurs
- Transaction *createTransaction(std::string &transactionID,
- TransferDirection direction);
+ Transaction *createTransaction(std::string &transactionID, TransferDirection direction);
// Receive the data packet from the transaction
// Return false when any error occurs
bool receive(std::string transactionID, DataPacket *packet, bool &eof);
// Send the data packet from the transaction
// Return false when any error occurs
- bool send(std::string transactionID, DataPacket *packet,
- std::shared_ptr<FlowFileRecord> flowFile,
- core::ProcessSession *session);
+ bool send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session);
// Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received.
bool confirm(std::string transactionID);
// Cancel the transaction
@@ -530,14 +523,11 @@ class Site2SiteClientProtocol {
// Error the transaction
void error(std::string transactionID);
// Receive flow files for the process session
- void receiveFlowFiles(core::ProcessContext *context,
- core::ProcessSession *session);
+ void receiveFlowFiles(core::ProcessContext *context, core::ProcessSession *session);
// Transfer flow files for the process session
- void transferFlowFiles(core::ProcessContext *context,
- core::ProcessSession *session);
+ void transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session);
//! Transfer string for the process session
- void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload,
- std::map<std::string, std::string> attributes);
+ void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, std::map<std::string, std::string> attributes);
// deleteTransaction
void deleteTransaction(std::string transactionID);
// Nest Callback Class for write stream
@@ -554,8 +544,7 @@ class Site2SiteClientProtocol {
int size = std::min(len, (int) sizeof(buffer));
int ret = _packet->_transaction->getStream().readData(buffer, size);
if (ret != size) {
- _packet->_protocol->logger_->log_error(
- "Site2Site Receive Flow Size %d Failed %d", size, ret);
+ _packet->_protocol->logger_->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret);
break;
}
stream->write((const char *) buffer, size);
@@ -579,11 +568,9 @@ class Site2SiteClientProtocol {
readSize = stream->gcount();
else
readSize = 8192;
- int ret = _packet->_transaction->getStream().writeData(buffer,
- readSize);
+ int ret = _packet->_transaction->getStream().writeData(buffer, readSize);
if (ret != readSize) {
- _packet->_protocol->logger_->log_error(
- "Site2Site Send Flow Size %d Failed %d", readSize, ret);
+ _packet->_protocol->logger_->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret);
break;
}
_packet->_size += readSize;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h
index ab8d09b..65a5479 100644
--- a/libminifi/include/Site2SitePeer.h
+++ b/libminifi/include/Site2SitePeer.h
@@ -58,9 +58,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
/*
* Create a new site2site peer
*/
- explicit Site2SitePeer(
- std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket,
- const std::string host_, uint16_t port_)
+ explicit Site2SitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host_, uint16_t port_)
: host_(host_),
port_(port_),
stream_(injected_socket.release()),
@@ -147,8 +145,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
// whether need be to yield
bool isYield(std::string portId) {
std::lock_guard<std::mutex> lock(mutex_);
- std::map<std::string, uint64_t>::iterator it = this
- ->_yieldExpirationPortIdMap.find(portId);
+ std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId);
if (it != _yieldExpirationPortIdMap.end()) {
uint64_t yieldExpiration = it->second;
return (yieldExpiration >= getTimeMillis());
@@ -159,8 +156,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
// clear yield expiration
void clearYield(std::string portId) {
std::lock_guard<std::mutex> lock(mutex_);
- std::map<std::string, uint64_t>::iterator it = this
- ->_yieldExpirationPortIdMap.find(portId);
+ std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId);
if (it != _yieldExpirationPortIdMap.end()) {
_yieldExpirationPortIdMap.erase(portId);
}
@@ -219,9 +215,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
return Serializable::read(value, stream_.get());
}
int readUTF(std::string &str, bool widen = false) {
- return org::apache::nifi::minifi::io::Serializable::readUTF(str,
- stream_.get(),
- widen);
+ return org::apache::nifi::minifi::io::Serializable::readUTF(str, stream_.get(), widen);
}
// open connection to the peer
bool Open();
@@ -232,8 +226,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
* Move assignment operator.
*/
Site2SitePeer& operator=(Site2SitePeer&& other) {
- stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
- other.stream_.release());
+ stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(other.stream_.release());
host_ = std::move(other.host_);
port_ = std::move(other.port_);
_yieldExpiration = 0;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 21bbbd0..50ab6c9 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -42,20 +42,16 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
/*!
* Create a new threaded scheduling agent.
*/
- ThreadedSchedulingAgent(
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider,
- std::shared_ptr<core::Repository> repo,
- std::shared_ptr<Configure> configuration)
- : SchedulingAgent(controller_service_provider, repo, configuration), logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
+ ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration)
+ : SchedulingAgent(controller_service_provider, repo, configuration),
+ logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
}
// Destructor
virtual ~ThreadedSchedulingAgent() {
}
// Run function for the thread
- virtual void run(std::shared_ptr<core::Processor> processor,
- core::ProcessContext *processContext,
- core::ProcessSessionFactory *sessionFactory) = 0;
+ virtual void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
public:
// schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 74096ee..597dc76 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -37,10 +37,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
/*!
* Create a new processor
*/
- TimerDrivenSchedulingAgent(
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider,
- std::shared_ptr<core::Repository> repo,
- std::shared_ptr<Configure> configure)
+ TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure)
: ThreadedSchedulingAgent(controller_service_provider, repo, configure) {
}
// Destructor
@@ -49,16 +46,13 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
/**
* Run function that accepts the processor, context and session factory.
*/
- void run(std::shared_ptr<core::Processor> processor,
- core::ProcessContext *processContext,
- core::ProcessSessionFactory *sessionFactory);
+ void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
private:
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
- TimerDrivenSchedulingAgent &operator=(
- const TimerDrivenSchedulingAgent &parent);
+ TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/controllers/SSLContextService.h
----------------------------------------------------------------------
diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h
index 7de26b4..9093d5f 100644
--- a/libminifi/include/controllers/SSLContextService.h
+++ b/libminifi/include/controllers/SSLContextService.h
@@ -63,13 +63,15 @@ class SSLContextService : public core::controller::ControllerService {
: ControllerService(name, id),
initialized_(false),
valid_(false),
- logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {}
+ logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
+ }
explicit SSLContextService(const std::string &name, uuid_t uuid = 0)
: ControllerService(name, uuid),
initialized_(false),
valid_(false),
- logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {}
+ logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {
+ }
virtual void initialize();
@@ -97,43 +99,35 @@ class SSLContextService : public core::controller::ControllerService {
return false;
}
- bool configure_ssl_context(SSL_CTX *ctx)
- {
- if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
- <= 0) {
- logger_->log_error("Could not create load certificate, error : %s",
- std::strerror(errno));
- return false;
- }
- if (!IsNullOrEmpty(passphrase_)) {
- SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
- SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
- }
-
- int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(),
- SSL_FILETYPE_PEM);
- if (retp != 1) {
- logger_->log_error("Could not create load private key,%i on %s error : %s",
- retp, private_key_, std::strerror(errno));
- return false;
- }
-
- if (!SSL_CTX_check_private_key(ctx)) {
- logger_->log_error(
- "Private key does not match the public certificate, error : %s",
- std::strerror(errno));
- return false;
- }
-
- retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
- if (retp == 0) {
- logger_->log_error("Can not load CA certificate, Exiting, error : %s",
- std::strerror(errno));
- return false;
- }
-
- return true;
- }
+ bool configure_ssl_context(SSL_CTX *ctx) {
+ if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
+ logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
+ return false;
+ }
+ if (!IsNullOrEmpty(passphrase_)) {
+ SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_);
+ SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
+ }
+
+ int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM);
+ if (retp != 1) {
+ logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno));
+ return false;
+ }
+
+ if (!SSL_CTX_check_private_key(ctx)) {
+ logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
+ return false;
+ }
+
+ retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
+ if (retp == 0) {
+ logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
+ return false;
+ }
+
+ return true;
+ }
protected:
@@ -164,7 +158,7 @@ class SSLContextService : public core::controller::ControllerService {
std::string ca_certificate_;
private:
- std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<logging::Logger> logger_;
};
typedef int (SSLContextService::*ptr)(char *, int, int, void *);
REGISTER_RESOURCE(SSLContextService);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ClassLoader.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h
index 9f8fcae..31292b2 100644
--- a/libminifi/include/core/ClassLoader.h
+++ b/libminifi/include/core/ClassLoader.h
@@ -62,8 +62,7 @@ class ObjectFactory {
/**
* Create a shared pointer to a new processor.
*/
- virtual std::shared_ptr<Connectable> create(const std::string &name,
- uuid_t uuid) {
+ virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) {
return nullptr;
}
@@ -111,8 +110,7 @@ class DefautObjectFactory : public ObjectFactory {
/**
* Create a shared pointer to a new processor.
*/
- virtual std::shared_ptr<Connectable> create(const std::string &name,
- uuid_t uuid) {
+ virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) {
std::shared_ptr<T> ptr = std::make_shared<T>(name, uuid);
return std::static_pointer_cast<Connectable>(ptr);
}
@@ -177,15 +175,13 @@ class ClassLoader {
/**
* Register a class with the give ProcessorFactory
*/
- void registerClass(const std::string &name,
- std::unique_ptr<ObjectFactory> factory) {
- if (loaded_factories_.find(name) != loaded_factories_.end()){
+ void registerClass(const std::string &name, std::unique_ptr<ObjectFactory> factory) {
+ if (loaded_factories_.find(name) != loaded_factories_.end()) {
return;
}
std::lock_guard<std::mutex> lock(internal_mutex_);
-
loaded_factories_.insert(std::make_pair(name, std::move(factory)));
}
@@ -196,8 +192,7 @@ class ClassLoader {
* @return nullptr or object created from class_name definition.
*/
template<class T = Connectable>
- std::shared_ptr<T> instantiate(const std::string &class_name,
- const std::string &name);
+ std::shared_ptr<T> instantiate(const std::string &class_name, const std::string &name);
/**
* Instantiate object based on class_name
@@ -217,12 +212,11 @@ class ClassLoader {
std::vector<void *> dl_handles_;
private:
- std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<logging::Logger> logger_;
};
template<class T>
-std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name,
- const std::string &name) {
+std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, const std::string &name) {
std::lock_guard<std::mutex> lock(internal_mutex_);
auto factory_entry = loaded_factories_.find(class_name);
if (factory_entry != loaded_factories_.end()) {
@@ -234,8 +228,7 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name,
}
template<class T>
-std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name,
- uuid_t uuid) {
+std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, uuid_t uuid) {
std::lock_guard<std::mutex> lock(internal_mutex_);
auto factory_entry = loaded_factories_.find(class_name);
if (factory_entry != loaded_factories_.end()) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ConfigurationFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h
index ed0bdb5..b58c170 100644
--- a/libminifi/include/core/ConfigurationFactory.h
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -29,19 +29,16 @@ namespace minifi {
namespace core {
template<typename T>
-typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(
- const std::shared_ptr<core::Repository> &repo,
- const std::shared_ptr<core::Repository> &flow_file_repo,
- std::shared_ptr<Configure> configuration, const std::string path) {
+typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo,
+ std::shared_ptr<Configure> configuration,
+ const std::string path) {
throw std::runtime_error("Cannot instantiate class");
}
template<typename T>
-typename std::enable_if<class_operations<T>::value, T*>::type instantiate(
- const std::shared_ptr<core::Repository> &repo,
- const std::shared_ptr<core::Repository> &flow_file_repo,
- const std::shared_ptr<io::StreamFactory> &stream_factory,
- std::shared_ptr<Configure> configuration, const std::string path) {
+typename std::enable_if<class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo,
+ const std::shared_ptr<io::StreamFactory> &stream_factory,
+ std::shared_ptr<Configure> configuration, const std::string path) {
return new T(repo, flow_file_repo, stream_factory, configuration, path);
}
@@ -49,13 +46,10 @@ typename std::enable_if<class_operations<T>::value, T*>::type instantiate(
* Configuration factory is used to create a new FlowConfiguration
* object.
*/
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
- std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<Configure> configure,
- std::shared_ptr<io::StreamFactory> stream_factory,
- const std::string configuration_class_name, const std::string path = "",
- bool fail_safe = false);
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
+ std::shared_ptr<io::StreamFactory> stream_factory,
+ const std::string configuration_class_name, const std::string path = "",
+ bool fail_safe = false);
} /* namespace core */
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/Connectable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 76536f4..150b5fc 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -68,8 +68,7 @@ class Connectable : public CoreComponent {
* Get outgoing connection based on relationship
* @return set of outgoing connections.
*/
- std::set<std::shared_ptr<Connectable>> getOutGoingConnections(
- std::string relationship);
+ std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship);
/**
* Get next incoming connection
@@ -147,7 +146,7 @@ class Connectable : public CoreComponent {
// Incoming connections
std::set<std::shared_ptr<Connectable>> _incomingConnections;
// Outgoing connections map based on Relationship name
- std::map<std::string, std::set<std::shared_ptr<Connectable>>> out_going_connections_;
+ std::map<std::string, std::set<std::shared_ptr<Connectable>>>out_going_connections_;
// Mutex for protection
std::mutex relationship_mutex_;
@@ -163,7 +162,7 @@ class Connectable : public CoreComponent {
// Concurrent condition variable for whether there is incoming work to do
std::condition_variable work_condition_;
- private:
+private:
std::shared_ptr<logging::Logger> logger_;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/Core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index a9fb435..f100d8b 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -74,10 +74,9 @@ typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type i
template<typename T>
typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate(const std::string name = "") {
- if (name.length() == 0){
+ if (name.length() == 0) {
return std::make_shared<T>();
- }
- else{
+ } else {
return std::make_shared<T>(name);
}
}
@@ -107,7 +106,6 @@ class CoreComponent {
uuidStr_ = uuidStr;
}
-
/**
* Move Constructor.
*/
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 1fee8c5..edcb2b6 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -58,9 +58,7 @@ class FlowConfiguration : public CoreComponent {
* Constructor that will be used for configuring
* the flow controller.
*/
- explicit FlowConfiguration(std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<io::StreamFactory> stream_factory,
+ explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<io::StreamFactory> stream_factory,
std::shared_ptr<Configure> configuration,
const std::string path)
: CoreComponent(core::getClassName<FlowConfiguration>()),
@@ -68,31 +66,23 @@ class FlowConfiguration : public CoreComponent {
config_path_(path),
stream_factory_(stream_factory),
logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
- controller_services_ = std::make_shared<
- core::controller::ControllerServiceMap>();
- service_provider_ = std::make_shared<
- core::controller::StandardControllerServiceProvider>(
- controller_services_, nullptr, configuration);
+ controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
+ service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
}
virtual ~FlowConfiguration();
// Create Processor (Node/Input/Output Port) based on the name
- std::shared_ptr<core::Processor> createProcessor(std::string name,
- uuid_t uuid);
+ std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t uuid);
// Create Root Processor Group
- std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name,
- uuid_t uuid);
+ std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid);
- std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(
- const std::string &class_name, const std::string &name, uuid_t uuid);
+ std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid);
// Create Remote Processor Group
- std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name,
- uuid_t uuid);
+ std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, uuid_t uuid);
// Create Connection
- std::shared_ptr<minifi::Connection> createConnection(std::string name,
- uuid_t uuid);
+ std::shared_ptr<minifi::Connection> createConnection(std::string name, uuid_t uuid);
// Create Provenance Report Task
std::shared_ptr<core::Processor> createProvenanceReportTask(void);
@@ -113,8 +103,7 @@ class FlowConfiguration : public CoreComponent {
* @return Extensions should return a non-null pointer in order to
* properly configure flow controller.
*/
- virtual std::unique_ptr<core::ProcessGroup> getRoot(
- const std::string &from_config) {
+ virtual std::unique_ptr<core::ProcessGroup> getRoot(const std::string &from_config) {
return nullptr;
}
@@ -134,7 +123,7 @@ class FlowConfiguration : public CoreComponent {
std::shared_ptr<core::Repository> flow_file_repo_;
// stream factory
std::shared_ptr<io::StreamFactory> stream_factory_;
-
+
private:
std::shared_ptr<logging::Logger> logger_;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/FlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 050d15f..a0b8dec 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -191,9 +191,7 @@ class FlowFile {
// Check whether it is still being penalized
bool isPenalized() {
- return (
- penaltyExpiration_ms_ > 0 ?
- penaltyExpiration_ms_ > getTimeMillis() : false);
+ return (penaltyExpiration_ms_ > 0 ? penaltyExpiration_ms_ > getTimeMillis() : false);
}
/**
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 8094e05..48e0108 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -46,12 +46,10 @@ class ProcessContext : public controller::ControllerServiceLookup {
/*!
* Create a new process context associated with the processor/controller service/state manager
*/
- ProcessContext(
- ProcessorNode &processor,
- std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider,
- std::shared_ptr<core::Repository> repo)
+ ProcessContext(ProcessorNode &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, std::shared_ptr<core::Repository> repo)
: processor_node_(processor),
- controller_service_provider_(controller_service_provider), logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+ controller_service_provider_(controller_service_provider),
+ logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
repo_ = repo;
}
// Destructor
@@ -106,10 +104,8 @@ class ProcessContext : public controller::ControllerServiceLookup {
* @return the ControllerService that is registered with the given
* identifier
*/
- std::shared_ptr<core::controller::ControllerService> getControllerService(
- const std::string &identifier) {
- return controller_service_provider_->getControllerServiceForComponent(
- identifier, processor_node_.getUUIDStr());
+ std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) {
+ return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_.getUUIDStr());
}
/**
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index ccf744e..f54f5b4 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -55,8 +55,7 @@ class ProcessGroup {
/*!
* Create a new process group
*/
- ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL,
- ProcessGroup *parent = NULL);
+ ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL);
// Destructor
virtual ~ProcessGroup();
// Set Processor Name
@@ -111,11 +110,9 @@ class ProcessGroup {
return false;
}
// Start Processing
- void startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
- EventDrivenSchedulingAgent *eventScheduler);
+ void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
// Stop Processing
- void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
- EventDrivenSchedulingAgent *eventScheduler);
+ void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler);
// Whether it is root process group
bool isRootProcessGroup();
// set parent process group
@@ -147,26 +144,21 @@ class ProcessGroup {
* @param nodeId node identifier
* @param node controller service node.
*/
- void addControllerService(
- const std::string &nodeId,
- std::shared_ptr<core::controller::ControllerServiceNode> &node);
+ void addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node);
/**
* Find controllerservice node will search child groups until the nodeId is found.
* @param node node identifier
* @return controller service node, if it exists.
*/
- std::shared_ptr<core::controller::ControllerServiceNode> findControllerService(
- const std::string &nodeId);
+ std::shared_ptr<core::controller::ControllerServiceNode> findControllerService(const std::string &nodeId);
// removeConnection
void removeConnection(std::shared_ptr<Connection> connection);
// update property value
- void updatePropertyValue(std::string processorName, std::string propertyName,
- std::string propertyValue);
+ void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue);
- void getConnections(
- std::map<std::string, std::shared_ptr<Connection>> &connectionMap);
+ void getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap);
protected:
// A global unique identifier
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 1f6f234..4d20f59 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -48,13 +48,11 @@ class ProcessSession {
* Create a new process session
*/
ProcessSession(ProcessContext *processContext = NULL)
- : process_context_(processContext), logger_(logging::LoggerFactory<ProcessSession>::getLogger()) {
- logger_->log_trace("ProcessSession created for %s",
- process_context_->getProcessorNode().getName().c_str());
+ : process_context_(processContext),
+ logger_(logging::LoggerFactory<ProcessSession>::getLogger()) {
+ logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode().getName().c_str());
auto repo = processContext->getProvenanceRepository();
- provenance_report_ = new provenance::ProvenanceReporter(
- repo, process_context_->getProcessorNode().getUUIDStr(),
- process_context_->getProcessorNode().getName());
+ provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode().getUUIDStr(), process_context_->getProcessorNode().getName());
}
// Destructor
@@ -64,9 +62,9 @@ class ProcessSession {
}
// Commit the session
void commit();
-// Roll Back the session
+ // Roll Back the session
void rollback();
-// Get Provenance Report
+ // Get Provenance Report
provenance::ProvenanceReporter *getProvenanceReporter() {
return provenance_report_;
}
@@ -76,54 +74,39 @@ class ProcessSession {
// Create a new UUID FlowFile with no content resource claim and without parent
std::shared_ptr<core::FlowFile> create();
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
- std::shared_ptr<core::FlowFile> create(
- std::shared_ptr<core::FlowFile> &&parent);
+ std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &&parent);
// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
- std::shared_ptr<core::FlowFile> create(
- std::shared_ptr<core::FlowFile> &parent) {
+ std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent) {
return create(parent);
}
// Clone a new UUID FlowFile from parent both for content resource claim and attributes
- std::shared_ptr<core::FlowFile> clone(
- std::shared_ptr<core::FlowFile> &parent);
-// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
- std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent,
- int64_t offset, int64_t size);
-// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
- std::shared_ptr<core::FlowFile> duplicate(
- std::shared_ptr<core::FlowFile> &original);
-// Transfer the FlowFile to the relationship
- void transfer(std::shared_ptr<core::FlowFile> &flow,
- Relationship relationship);
- void transfer(std::shared_ptr<core::FlowFile> &&flow,
- Relationship relationship);
-// Put Attribute
- void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key,
- std::string value);
- void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key,
- std::string value);
-// Remove Attribute
+ std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent);
+ // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
+ std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size);
+ // Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
+ std::shared_ptr<core::FlowFile> duplicate(std::shared_ptr<core::FlowFile> &original);
+ // Transfer the FlowFile to the relationship
+ void transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
+ void transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship);
+ // Put Attribute
+ void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value);
+ void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value);
+ // Remove Attribute
void removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key);
void removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key);
-// Remove Flow File
+ // Remove Flow File
void remove(std::shared_ptr<core::FlowFile> &flow);
void remove(std::shared_ptr<core::FlowFile> &&flow);
-// Execute the given read callback against the content
- void read(std::shared_ptr<core::FlowFile> &flow,
- InputStreamCallback *callback);
- void read(std::shared_ptr<core::FlowFile> &&flow,
- InputStreamCallback *callback);
-// Execute the given write callback against the content
- void write(std::shared_ptr<core::FlowFile> &flow,
- OutputStreamCallback *callback);
- void write(std::shared_ptr<core::FlowFile> &&flow,
- OutputStreamCallback *callback);
-// Execute the given write/append callback against the content
- void append(std::shared_ptr<core::FlowFile> &flow,
- OutputStreamCallback *callback);
- void append(std::shared_ptr<core::FlowFile> &&flow,
- OutputStreamCallback *callback);
-// Penalize the flow
+ // Execute the given read callback against the content
+ void read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
+ void read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback);
+ // Execute the given write callback against the content
+ void write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
+ void write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback);
+ // Execute the given write/append callback against the content
+ void append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
+ void append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback);
+ // Penalize the flow
void penalize(std::shared_ptr<core::FlowFile> &flow);
void penalize(std::shared_ptr<core::FlowFile> &&flow);
@@ -132,13 +115,14 @@ class ProcessSession {
* @param stream incoming data stream that contains the data to store into a file
* @param flow flow file
*/
- void importFrom(io::DataStream &stream,
- std::shared_ptr<core::FlowFile> &&flow);
+ void importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow);
// import from the data source.
void import(std::string source, std::shared_ptr<core::FlowFile> &flow,
- bool keepSource = true, uint64_t offset = 0);
+ bool keepSource = true,
+ uint64_t offset = 0);
void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
- bool keepSource = true, uint64_t offset = 0);
+ bool keepSource = true,
+ uint64_t offset = 0);
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
@@ -148,26 +132,25 @@ class ProcessSession {
protected:
// FlowFiles being modified by current process session
std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles;
-// Copy of the original FlowFiles being modified by current process session as above
+ // Copy of the original FlowFiles being modified by current process session as above
std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles;
-// FlowFiles being added by current process session
+ // FlowFiles being added by current process session
std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles;
-// FlowFiles being deleted by current process session
+ // FlowFiles being deleted by current process session
std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles;
-// FlowFiles being transfered to the relationship
+ // FlowFiles being transfered to the relationship
std::map<std::string, Relationship> _transferRelationship;
-// FlowFiles being cloned for multiple connections per relationship
+ // FlowFiles being cloned for multiple connections per relationship
std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles;
private:
// Clone the flow file during transfer to multiple connections for a relationship
- std::shared_ptr<core::FlowFile> cloneDuringTransfer(
- std::shared_ptr<core::FlowFile> &parent);
-// ProcessContext
+ std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent);
+ // ProcessContext
ProcessContext *process_context_;
-// Logger
+ // Logger
std::shared_ptr<logging::Logger> logger_;
-// Provenance Report
+ // Provenance Report
provenance::ProvenanceReporter *provenance_report_;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 9a19072..251ec47 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -62,8 +62,7 @@ namespace core {
#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
// Processor Class
-class Processor : public Connectable, public ConfigurableComponent,
- public std::enable_shared_from_this<Processor> {
+class Processor : public Connectable, public ConfigurableComponent, public std::enable_shared_from_this<Processor> {
public:
// Constructor
@@ -192,8 +191,7 @@ class Processor : public Connectable, public ConfigurableComponent,
bool flowFilesOutGoingFull();
// Get outgoing connections based on relationship name
- std::set<std::shared_ptr<Connection> > getOutGoingConnections(
- std::string relationship);
+ std::set<std::shared_ptr<Connection> > getOutGoingConnections(std::string relationship);
// Add connection
bool addConnection(std::shared_ptr<Connectable> connection);
// Remove connection
@@ -205,8 +203,7 @@ class Processor : public Connectable, public ConfigurableComponent,
// Get the Next RoundRobin incoming connection
std::shared_ptr<Connection> getNextIncomingConnection();
// On Trigger
- void onTrigger(ProcessContext *context,
- ProcessSessionFactory *sessionFactory);
+ void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory);
virtual bool canEdit() {
return !isRunning();
@@ -220,8 +217,7 @@ class Processor : public Connectable, public ConfigurableComponent,
virtual void initialize() {
}
// Scheduled event hook, overridden by NiFi Process Designer
- virtual void onSchedule(ProcessContext *context,
- ProcessSessionFactory *sessionFactory) {
+ virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
}
protected:
@@ -243,7 +239,7 @@ class Processor : public Connectable, public ConfigurableComponent,
// Trigger the Processor even if the incoming connection is empty
std::atomic<bool> _triggerWhenEmpty;
- private:
+ private:
// Mutex for protection
std::mutex mutex_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessorNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 8836f62..9bf1f52 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -60,8 +60,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
* @return result of getting property.
*/
bool getProperty(const std::string name, std::string &value) {
- const std::shared_ptr<ConfigurableComponent> processor_cast =
- std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
if (nullptr != processor_cast)
return processor_cast->getProperty(name, value);
else {
@@ -75,8 +74,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
* @return result of setting property.
*/
bool setProperty(const std::string name, std::string value) {
- const std::shared_ptr<ConfigurableComponent> processor_cast =
- std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
bool ret = ConfigurableComponent::setProperty(name, value);
if (nullptr != processor_cast)
ret = processor_cast->setProperty(name, value);
@@ -92,8 +90,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
* @return whether property was set or not
*/
bool setProperty(Property &prop, std::string value) {
- const std::shared_ptr<ConfigurableComponent> processor_cast =
- std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
bool ret = ConfigurableComponent::setProperty(prop, value);
if (nullptr != processor_cast)
ret = processor_cast->setProperty(prop, value);
@@ -107,8 +104,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
* @return result of set operation.
*/
bool setSupportedProperties(std::set<Property> properties) {
- const std::shared_ptr<ConfigurableComponent> processor_cast =
- std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
bool ret = ConfigurableComponent::setSupportedProperties(properties);
if (nullptr != processor_cast)
ret = processor_cast->setSupportedProperties(properties);
@@ -164,8 +160,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
* Get outgoing connection based on relationship
* @return set of outgoing connections.
*/
- std::set<std::shared_ptr<Connectable>> getOutGoingConnections(
- std::string relationship) {
+ std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship) {
return processor_->getOutGoingConnections(relationship);
}