You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:07 UTC
[4/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with
increased line length to source
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 7bf4f58..a11db2b 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -29,8 +29,7 @@ namespace nifi {
namespace minifi {
namespace core {
-core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
- YAML::Node rootFlowNode) {
+core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
uuid_t uuid;
checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
@@ -38,18 +37,15 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
std::string id = getOrGenerateId(&rootFlowNode);
uuid_parse(id.c_str(), uuid);
- logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id,
- flowName);
- std::unique_ptr<core::ProcessGroup> group =
- FlowConfiguration::createRootProcessGroup(flowName, uuid);
+ logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
+ std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid);
this->name_ = flowName;
return group.release();
}
-void YamlConfiguration::parseProcessorNodeYaml(
- YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
int64_t schedulingPeriod = -1;
int64_t penalizationPeriod = -1;
int64_t yieldPeriod = -1;
@@ -65,8 +61,7 @@ void YamlConfiguration::parseProcessorNodeYaml(
if (processorsNode) {
if (processorsNode.IsSequence()) {
// Evaluate sequence of processors
- for (YAML::const_iterator iter = processorsNode.begin();
- iter != processorsNode.end(); ++iter) {
+ for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
core::ProcessorConfig procCfg;
YAML::Node procNode = iter->as<YAML::Node>();
@@ -74,28 +69,23 @@ void YamlConfiguration::parseProcessorNodeYaml(
procCfg.name = procNode["name"].as<std::string>();
procCfg.id = getOrGenerateId(&procNode);
uuid_parse(procCfg.id.c_str(), uuid);
- logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
- procCfg.name, procCfg.id);
+ logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
procCfg.javaClass = procNode["class"].as<std::string>();
- logger_->log_debug("parseProcessorNode: class => [%s]",
- procCfg.javaClass);
+ logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
// Determine the processor name only from the Java class
int lastOfIdx = procCfg.javaClass.find_last_of(".");
if (lastOfIdx != std::string::npos) {
lastOfIdx++; // if a value is found, increment to move beyond the .
int nameLength = procCfg.javaClass.length() - lastOfIdx;
- std::string processorName = procCfg.javaClass.substr(lastOfIdx,
- nameLength);
+ std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
processor = this->createProcessor(processorName, uuid);
}
if (!processor) {
- logger_->log_error("Could not create a processor %s with id %s",
- procCfg.name, procCfg.id);
- throw std::invalid_argument(
- "Could not create processor " + procCfg.name);
+ logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
+ throw std::invalid_argument("Could not create processor " + procCfg.name);
}
processor->setName(procCfg.name);
@@ -131,11 +121,8 @@ void YamlConfiguration::parseProcessorNodeYaml(
if (procNode["auto-terminated relationships list"]) {
YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence()
- && !autoTerminatedSequence.IsNull()
- && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
- relIter != autoTerminatedSequence.end(); ++relIter) {
+ if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
+ for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
std::string autoTerminatedRel = relIter->as<std::string>();
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
}
@@ -151,20 +138,17 @@ void YamlConfiguration::parseProcessorNodeYaml(
// Take care of scheduling
core::TimeUnit unit;
- if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
- && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
- if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
- && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+ if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
processor->setPenalizationPeriodMsec(penalizationPeriod);
}
- if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
- && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+ if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
processor->setYieldPeriodMsec(yieldPeriod);
}
@@ -174,16 +158,13 @@ void YamlConfiguration::parseProcessorNodeYaml(
if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
processor->setSchedulingStrategy(core::TIMER_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s",
- procCfg.schedulingStrategy);
+ logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
} else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
processor->setSchedulingStrategy(core::EVENT_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s",
- procCfg.schedulingStrategy);
+ logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
} else {
processor->setSchedulingStrategy(core::CRON_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s",
- procCfg.schedulingStrategy);
+ logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
}
int64_t maxConcurrentTasks;
@@ -209,18 +190,15 @@ void YamlConfiguration::parseProcessorNodeYaml(
parentGroup->addProcessor(processor);
}
} else {
- throw new std::invalid_argument(
- "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
} else {
- throw new std::invalid_argument(
- "Cannot instantiate a MiNiFi instance without a defined "
- "Processors configuration node.");
+ throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined "
+ "Processors configuration node.");
}
}
-void YamlConfiguration::parseRemoteProcessGroupYaml(
- YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
uuid_t uuid;
std::string id;
@@ -231,8 +209,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
if (rpgNode) {
if (rpgNode->IsSequence()) {
- for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end();
- ++iter) {
+ for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
YAML::Node currRpgNode = iter->as<YAML::Node>();
checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
@@ -258,12 +235,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
- if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
- && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
- yieldPeriodValue) && group) {
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
- yieldPeriodValue);
+ if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
group->setYieldPeriodMsec(yieldPeriodValue);
}
}
@@ -272,12 +245,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
std::string timeout = currRpgNode["timeout"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
- if (core::Property::StringToTime(timeout, timeoutValue, unit)
- && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
- timeoutValue) && group) {
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
- timeoutValue);
+ if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
group->setTimeOut(timeoutValue);
}
}
@@ -288,8 +257,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts && inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin();
- portIter != inputPorts.end(); ++portIter) {
+ for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
logger_->log_debug("Got a current port, iterating...");
YAML::Node currPort = portIter->as<YAML::Node>();
@@ -299,8 +267,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
}
YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
if (outputPorts && outputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = outputPorts.begin();
- portIter != outputPorts.end(); ++portIter) {
+ for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
logger_->log_debug("Got a current port, iterating...");
YAML::Node currPort = portIter->as<YAML::Node>();
@@ -313,8 +280,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
}
}
-void YamlConfiguration::parseProvenanceReportingYaml(
- YAML::Node *reportNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup) {
uuid_t port_uuid;
int64_t schedulingPeriod = -1;
@@ -330,9 +296,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(
std::shared_ptr<core::Processor> processor = nullptr;
processor = createProvenanceReportTask();
- std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask =
- std::static_pointer_cast<
- core::reporting::SiteToSiteProvenanceReportingTask>(processor);
+ std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor);
YAML::Node node = reportNode->as<YAML::Node>();
@@ -354,21 +318,16 @@ void YamlConfiguration::parseProvenanceReportingYaml(
processor->setScheduledState(core::RUNNING);
core::TimeUnit unit;
- if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit)
- && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
- schedulingPeriod)) {
- logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns",
- schedulingPeriod);
+ if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
if (schedulingStrategyStr == "TIMER_DRIVEN") {
processor->setSchedulingStrategy(core::TIMER_DRIVEN);
- logger_->log_debug("ProvenanceReportingTask scheduling strategy %s",
- schedulingStrategyStr);
+ logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr);
} else {
- throw std::invalid_argument(
- "Invalid scheduling strategy " + schedulingStrategyStr);
+ throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr);
}
reportTask->setHost(hostStr);
@@ -387,19 +346,18 @@ void YamlConfiguration::parseProvenanceReportingYaml(
}
}
-void YamlConfiguration::parseControllerServices(
- YAML::Node *controllerServicesNode) {
+void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNode) {
if (!IsNullOrEmpty(controllerServicesNode)) {
if (controllerServicesNode->IsSequence()) {
for (auto iter : *controllerServicesNode) {
YAML::Node controllerServiceNode = iter.as<YAML::Node>();
try {
checkRequiredField(&controllerServiceNode, "name",
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
checkRequiredField(&controllerServiceNode, "id",
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
checkRequiredField(&controllerServiceNode, "class",
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
auto name = controllerServiceNode["name"].as<std::string>();
auto id = controllerServiceNode["id"].as<std::string>();
@@ -407,42 +365,28 @@ void YamlConfiguration::parseControllerServices(
uuid_t uuid;
uuid_parse(id.c_str(), uuid);
- auto controller_service_node = createControllerService(type, name,
- uuid);
+ auto controller_service_node = createControllerService(type, name, uuid);
if (nullptr != controller_service_node) {
- logger_->log_debug(
- "Created Controller Service with UUID %s and name %s", id,
- name);
+ logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
controller_service_node->initialize();
YAML::Node propertiesNode = controllerServiceNode["Properties"];
// we should propogate propertiets to the node and to the implementation
- parsePropertiesNodeYaml(
- &propertiesNode,
- std::static_pointer_cast<core::ConfigurableComponent>(
- controller_service_node));
- if (controller_service_node->getControllerServiceImplementation()
- != nullptr) {
- parsePropertiesNodeYaml(
- &propertiesNode,
- std::static_pointer_cast<core::ConfigurableComponent>(
- controller_service_node
- ->getControllerServiceImplementation()));
+ parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node));
+ if (controller_service_node->getControllerServiceImplementation() != nullptr) {
+ parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()));
}
}
controller_services_->put(id, controller_service_node);
controller_services_->put(name, controller_service_node);
} catch (YAML::InvalidNode &in) {
- throw Exception(
- ExceptionType::GENERAL_EXCEPTION,
- "Name, id, and class must be specified for controller services");
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
}
}
}
}
}
-void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
- core::ProcessGroup *parent) {
+void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::ProcessGroup *parent) {
if (!parent) {
logger_->log_error("parseProcessNode: no parent group was provided");
return;
@@ -450,8 +394,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
if (connectionsNode) {
if (connectionsNode->IsSequence()) {
- for (YAML::const_iterator iter = connectionsNode->begin();
- iter != connectionsNode->end(); ++iter) {
+ for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
YAML::Node connectionNode = iter->as<YAML::Node>();
std::shared_ptr<minifi::Connection> connection = nullptr;
@@ -462,15 +405,13 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
std::string id = getOrGenerateId(&connectionNode);
uuid_parse(id.c_str(), uuid);
connection = this->createConnection(name, uuid);
- logger_->log_debug("Created connection with UUID %s and name %s", id,
- name);
+ logger_->log_debug("Created connection with UUID %s and name %s", id, name);
// Configure connection source
checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
core::Relationship relationship(rawRelationship, "");
- logger_->log_debug("parseConnection: relationship => [%s]",
- rawRelationship);
+ logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
if (connection) {
connection->setRelationship(relationship);
}
@@ -481,7 +422,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
std::string connectionSrcProcId = connectionNode["source id"].as<std::string>();
uuid_parse(connectionSrcProcId.c_str(), srcUUID);
logger_->log_debug("Using 'source id' to match source with same id for "
- "connection '%s': source id => [%s]", name, connectionSrcProcId);
+ "connection '%s': source id => [%s]",
+ name, connectionSrcProcId);
} else {
// if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY);
@@ -491,20 +433,20 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
// the source name is a remote port id, so use that as the source id
uuid_copy(srcUUID, tmpUUID);
logger_->log_debug("Using 'source name' containing a remote port id to match the source for "
- "connection '%s': source name => [%s]", name, connectionSrcProcName);
+ "connection '%s': source name => [%s]",
+ name, connectionSrcProcName);
} else {
// lastly, look the processor up by name
auto srcProcessor = parent->findProcessor(connectionSrcProcName);
if (NULL != srcProcessor) {
srcProcessor->getUUID(srcUUID);
logger_->log_debug("Using 'source name' to match source with same name for "
- "connection '%s': source name => [%s]", name, connectionSrcProcName);
+ "connection '%s': source name => [%s]",
+ name, connectionSrcProcName);
} else {
// we ran out of ways to discover the source processor
- logger_->log_error(
- "Could not locate a source with name %s to create a connection", connectionSrcProcName);
- throw std::invalid_argument(
- "Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
+ logger_->log_error("Could not locate a source with name %s to create a connection", connectionSrcProcName);
+ throw std::invalid_argument("Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
}
}
}
@@ -516,7 +458,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
std::string connectionDestProcId = connectionNode["destination id"].as<std::string>();
uuid_parse(connectionDestProcId.c_str(), destUUID);
logger_->log_debug("Using 'destination id' to match destination with same id for "
- "connection '%s': destination id => [%s]", name, connectionDestProcId);
+ "connection '%s': destination id => [%s]",
+ name, connectionDestProcId);
} else {
// we use the same logic as above for resolving the source processor
// for looking up the destination processor in absence of a processor id
@@ -524,24 +467,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
uuid_t tmpUUID;
if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
- NULL != parent->findProcessor(tmpUUID)) {
+ NULL != parent->findProcessor(tmpUUID)) {
// the destination name is a remote port id, so use that as the dest id
uuid_copy(destUUID, tmpUUID);
logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for "
- "connection '%s': destination name => [%s]", name, connectionDestProcName);
+ "connection '%s': destination name => [%s]",
+ name, connectionDestProcName);
} else {
// look the processor up by name
auto destProcessor = parent->findProcessor(connectionDestProcName);
if (NULL != destProcessor) {
destProcessor->getUUID(destUUID);
logger_->log_debug("Using 'destination name' to match destination with same name for "
- "connection '%s': destination name => [%s]", name, connectionDestProcName);
+ "connection '%s': destination name => [%s]",
+ name, connectionDestProcName);
} else {
// we ran out of ways to discover the destination processor
- logger_->log_error(
- "Could not locate a destination with name %s to create a connection", connectionDestProcName);
- throw std::invalid_argument(
- "Could not locate a destination with name " + connectionDestProcName + " to create a connection");
+ logger_->log_error("Could not locate a destination with name %s to create a connection", connectionDestProcName);
+ throw std::invalid_argument("Could not locate a destination with name " + connectionDestProcName + " to create a connection");
}
}
}
@@ -555,9 +498,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
}
}
-void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
- core::ProcessGroup *parent,
- TransferDirection direction) {
+void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, TransferDirection direction) {
uuid_t uuid;
std::shared_ptr<core::Processor> processor = NULL;
std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
@@ -572,16 +513,13 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
// Check for required fields
checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto nameStr = inputPortsObj["name"].as<std::string>();
- checkRequiredField(
- &inputPortsObj,
- "id",
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
- "The field 'id' is required for "
- "the port named '" + nameStr
- + "' in the YAML Config. If this port "
- "is an input port for a NiFi Remote Process Group, the port "
- "id should match the corresponding id specified in the NiFi configuration. "
- "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
+ checkRequiredField(&inputPortsObj, "id",
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
+ "The field 'id' is required for "
+ "the port named '" + nameStr + "' in the YAML Config. If this port "
+ "is an input port for a NiFi Remote Process Group, the port "
+ "id should match the corresponding id specified in the NiFi configuration. "
+ "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
auto portId = inputPortsObj["id"].as<std::string>();
uuid_parse(portId.c_str(), uuid);
@@ -597,9 +535,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
// handle port properties
YAML::Node nodeVal = portNode->as<YAML::Node>();
YAML::Node propertiesNode = nodeVal["Properties"];
- parsePropertiesNodeYaml(
- &propertiesNode,
- std::static_pointer_cast<core::ConfigurableComponent>(processor));
+ parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor));
// add processor to parent
parent->addProcessor(processor);
@@ -616,12 +552,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
}
}
-void YamlConfiguration::parsePropertiesNodeYaml(
- YAML::Node *propertiesNode,
- std::shared_ptr<core::ConfigurableComponent> processor) {
+void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor) {
// Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
- for (YAML::const_iterator propsIter = propertiesNode->begin();
- propsIter != propertiesNode->end(); ++propsIter) {
+ for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) {
std::string propertyName = propsIter->first.as<std::string>();
YAML::Node propertyValueNode = propsIter->second;
if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) {
@@ -634,12 +567,9 @@ void YamlConfiguration::parsePropertiesNodeYaml(
std::string rawValueString = propertiesNode.as<std::string>();
logger_->log_info("Found %s=%s", propertyName, rawValueString);
if (!processor->updateProperty(propertyName, rawValueString)) {
- std::shared_ptr<core::Connectable> proc =
- std::dynamic_pointer_cast<core::Connectable>(processor);
+ std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
if (proc != 0) {
- logger_->log_warn(
- "Received property %s with value %s but is not one of the properties for %s",
- propertyName, rawValueString, proc->getName());
+ logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName());
}
}
}
@@ -647,12 +577,9 @@ void YamlConfiguration::parsePropertiesNodeYaml(
} else {
std::string rawValueString = propertyValueNode.as<std::string>();
if (!processor->setProperty(propertyName, rawValueString)) {
- std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<
- core::Connectable>(processor);
+ std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
if (proc != 0) {
- logger_->log_warn(
- "Received property %s with value %s but is not one of the properties for %s",
- propertyName, rawValueString, proc->getName());
+ logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName());
}
}
}
@@ -660,8 +587,7 @@ void YamlConfiguration::parsePropertiesNodeYaml(
}
}
-std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
- const std::string &idField) {
+std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::string &idField) {
std::string id;
YAML::Node node = yamlNode->as<YAML::Node>();
@@ -669,9 +595,8 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
if (YAML::NodeType::Scalar == node[idField].Type()) {
id = node[idField].as<std::string>();
} else {
- throw std::invalid_argument(
- "getOrGenerateId: idField is expected to reference YAML::Node "
- "of YAML::NodeType::Scalar.");
+ throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node "
+ "of YAML::NodeType::Scalar.");
}
} else {
uuid_t uuid;
@@ -684,10 +609,7 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode,
return id;
}
-void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
- const std::string &fieldName,
- const std::string &yamlSection,
- const std::string &errorMessage) {
+void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, const std::string &yamlSection, const std::string &errorMessage) {
std::string errMsg = errorMessage;
if (!yamlNode->as<YAML::Node>()[fieldName]) {
if (errMsg.empty()) {
@@ -695,11 +617,8 @@ void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
// invalid YAML config file, using the component name if present
errMsg =
yamlNode->as<YAML::Node>()["name"] ?
- "Unable to parse configuration file for component named '"
- + yamlNode->as<YAML::Node>()["name"].as<std::string>()
- + "' as required field '" + fieldName + "' is missing" :
- "Unable to parse configuration file as required field '"
- + fieldName + "' is missing";
+ "Unable to parse configuration file for component named '" + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '" + fieldName + "' is missing" :
+ "Unable to parse configuration file as required field '" + fieldName + "' is missing";
if (!yamlSection.empty()) {
errMsg += " [in '" + yamlSection + "' section of configuration file]";
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index 307f41d..137bf1a 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -33,9 +33,7 @@ namespace io {
* @return resulting write size
**/
int BaseStream::write(uint32_t base_value, bool is_little_endian) {
- return Serializable::write(base_value,
- reinterpret_cast<DataStream*>(composable_stream_),
- is_little_endian);
+ return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
}
int BaseStream::writeData(uint8_t *value, int size) {
@@ -54,9 +52,7 @@ int BaseStream::writeData(uint8_t *value, int size) {
* @return resulting write size
**/
int BaseStream::write(uint16_t base_value, bool is_little_endian) {
- return Serializable::write(base_value,
- reinterpret_cast<DataStream*>(composable_stream_),
- is_little_endian);
+ return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
}
/**
@@ -67,8 +63,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint8_t *value, int len) {
- return Serializable::write(value, len,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::write(value, len, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -79,9 +74,7 @@ int BaseStream::write(uint8_t *value, int len) {
* @return resulting write size
**/
int BaseStream::write(uint64_t base_value, bool is_little_endian) {
- return Serializable::write(base_value,
- reinterpret_cast<DataStream*>(composable_stream_),
- is_little_endian);
+ return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
}
/**
@@ -100,8 +93,7 @@ int BaseStream::write(bool value) {
* @return resulting write size
**/
int BaseStream::writeUTF(std::string str, bool widen) {
- return Serializable::writeUTF(
- str, reinterpret_cast<DataStream*>(composable_stream_), widen);
+ return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen);
}
/**
@@ -111,8 +103,7 @@ int BaseStream::writeUTF(std::string str, bool widen) {
* @return resulting read size
**/
int BaseStream::read(uint8_t &value) {
- return Serializable::read(value,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -122,8 +113,7 @@ int BaseStream::read(uint8_t &value) {
* @return resulting read size
**/
int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
- return Serializable::read(base_value,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::read(base_value, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -133,8 +123,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(char &value) {
- return Serializable::read(value,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -145,8 +134,7 @@ int BaseStream::read(char &value) {
* @return resulting read size
**/
int BaseStream::read(uint8_t *value, int len) {
- return Serializable::read(value, len,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::read(value, len, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -155,8 +143,7 @@ int BaseStream::read(uint8_t *value, int len) {
* @param buflen
*/
int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) {
- return Serializable::read(&buf[0], buflen,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::read(&buf[0], buflen, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
* Reads data and places it into buf
@@ -164,8 +151,7 @@ int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) {
* @param buflen
*/
int BaseStream::readData(uint8_t *buf, int buflen) {
- return Serializable::read(buf, buflen,
- reinterpret_cast<DataStream*>(composable_stream_));
+ return Serializable::read(buf, buflen, reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -175,9 +161,7 @@ int BaseStream::readData(uint8_t *buf, int buflen) {
* @return resulting read size
**/
int BaseStream::read(uint32_t &value, bool is_little_endian) {
- return Serializable::read(value,
- reinterpret_cast<DataStream*>(composable_stream_),
- is_little_endian);
+ return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
}
/**
@@ -187,9 +171,7 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(uint64_t &value, bool is_little_endian) {
- return Serializable::read(value,
- reinterpret_cast<DataStream*>(composable_stream_),
- is_little_endian);
+ return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian);
}
/**
@@ -199,8 +181,7 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::readUTF(std::string &str, bool widen) {
- return Serializable::readUTF(
- str, reinterpret_cast<DataStream*>(composable_stream_), widen);
+ return Serializable::readUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen);
}
} /* namespace io */
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index bd99bc7..57d6f03 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -39,8 +39,7 @@ namespace nifi {
namespace minifi {
namespace io {
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port,
- const uint16_t listeners = -1)
+Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
: requested_hostname_(hostname),
port_(port),
addr_info_(0),
@@ -86,8 +85,7 @@ void Socket::closeStream() {
}
int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
- if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype,
- p->ai_protocol)) == -1) {
+ if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
logger_->log_error("error while connecting to server socket");
return -1;
}
@@ -111,8 +109,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
sa_loc->sin_port = htons(port_);
// use any address if you are connecting to the local machine for testing
// otherwise we must use the requested hostname
- if (IsNullOrEmpty(requested_hostname_)
- || requested_hostname_ == "localhost") {
+ if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") {
sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
} else {
sa_loc->sin_addr.s_addr = addr;
@@ -149,12 +146,10 @@ int16_t Socket::initialize() {
hints.ai_flags |= AI_PASSIVE;
hints.ai_protocol = 0; /* any protocol */
- int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints,
- &addr_info_);
+ int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_);
if (errcode != 0) {
- logger_->log_error("Saw error during getaddrinfo, error: %s",
- strerror(errno));
+ logger_->log_error("Saw error during getaddrinfo, error: %s", strerror(errno));
return -1;
}
@@ -210,8 +205,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
if (retval < 0) {
- logger_->log_error("Saw error during selection, error:%i %s", retval,
- strerror(errno));
+ logger_->log_error("Saw error during selection, error:%i %s", retval, strerror(errno));
return retval;
}
@@ -221,8 +215,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) {
if (listeners_ > 0) {
struct sockaddr_storage remoteaddr; // client address
socklen_t addrlen = sizeof remoteaddr;
- int newfd = accept(socket_file_descriptor_,
- (struct sockaddr *) &remoteaddr, &addrlen);
+ int newfd = accept(socket_file_descriptor_, (struct sockaddr *) &remoteaddr, &addrlen);
FD_SET(newfd, &total_list_); // add to master set
if (newfd > socket_max_) { // keep track of the max
socket_max_ = newfd;
@@ -273,8 +266,7 @@ int16_t Socket::setSocketOptions(const int sock) {
#else
if (listeners_ > 0) {
// lose the pesky "address already in use" error message
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
- reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) {
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) {
logger_->log_error("setsockopt() SO_REUSEADDR failed");
close(sock);
return -1;
@@ -304,16 +296,14 @@ int Socket::writeData(uint8_t *value, int size) {
// check for errors
if (ret <= 0) {
close(socket_file_descriptor_);
- logger_->log_error("Could not send to %d, error: %s",
- socket_file_descriptor_, strerror(errno));
+ logger_->log_error("Could not send to %d, error: %s", socket_file_descriptor_, strerror(errno));
return ret;
}
bytes += ret;
}
if (ret)
- logger_->log_trace("Send data size %d over socket %d", size,
- socket_file_descriptor_);
+ logger_->log_trace("Send data size %d over socket %d", size, socket_file_descriptor_);
return bytes;
}
@@ -341,15 +331,11 @@ int Socket::read(uint64_t &value, bool is_little_endian) {
auto buf = readBuffer(value);
if (is_little_endian) {
- value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
- | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
- | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
- | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+ value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+ | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
} else {
- value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
- | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
- | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
- | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+ value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+ | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
}
return sizeof(value);
}
@@ -397,8 +383,7 @@ int Socket::readData(uint8_t *buf, int buflen) {
if (bytes_read == 0) {
logger_->log_info("Other side hung up on %d", fd);
} else {
- logger_->log_error("Could not recv on %d, error: %s", fd,
- strerror(errno));
+ logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno));
}
return -1;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 9e0dfce..92c7cda 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -44,15 +44,11 @@ int DataStream::read(uint64_t &value, bool is_little_endian) {
uint8_t *buf = &buffer[readBuffer];
if (is_little_endian) {
- value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
- | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
- | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
- | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+ value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24)
+ | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
} else {
- value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
- | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
- | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
- | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
+ value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32)
+ | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56);
}
readBuffer += 8;
return 8;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp
index c3f74c7..5e57c80 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -35,26 +35,20 @@ namespace io {
template<typename T>
int Serializable::writeData(const T &t, DataStream *stream) {
uint8_t bytes[sizeof t];
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
- static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
- bytes);
+ std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
return stream->writeData(bytes, sizeof t);
}
template<typename T>
int Serializable::writeData(const T &t, uint8_t *to_vec) {
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
- static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
- to_vec);
+ std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec);
return sizeof t;
}
template<typename T>
int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
uint8_t bytes[sizeof t];
- std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
- static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
- bytes);
+ std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes);
to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
return sizeof t;
}
@@ -97,36 +91,29 @@ int Serializable::read(uint8_t *value, int len, DataStream *stream) {
return stream->readData(value, len);
}
-int Serializable::read(uint16_t &value, DataStream *stream,
- bool is_little_endian) {
+int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) {
return stream->read(value, is_little_endian);
}
-int Serializable::read(uint32_t &value, DataStream *stream,
- bool is_little_endian) {
+int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) {
return stream->read(value, is_little_endian);
}
-int Serializable::read(uint64_t &value, DataStream *stream,
- bool is_little_endian) {
+int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) {
return stream->read(value, is_little_endian);
}
-int Serializable::write(uint32_t base_value, DataStream *stream,
- bool is_little_endian) {
+int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) {
const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
return writeData(value, stream);
}
-int Serializable::write(uint64_t base_value, DataStream *stream,
- bool is_little_endian) {
- const uint64_t value =
- is_little_endian == 1 ? htonll_r(base_value) : base_value;
+int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) {
+ const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value;
return writeData(value, stream);
}
-int Serializable::write(uint16_t base_value, DataStream *stream,
- bool is_little_endian) {
+int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) {
const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value;
return writeData(value, stream);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index b269cef..40cf1e9 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -57,9 +57,7 @@ int16_t TLSContext::initialize() {
std::string clientAuthStr;
bool needClientCert = true;
- if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr)
- && org::apache::nifi::minifi::utils::StringUtils::StringToBool(
- clientAuthStr, needClientCert))) {
+ if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) {
needClientCert = true;
}
@@ -67,8 +65,7 @@ int16_t TLSContext::initialize() {
method = TLSv1_2_client_method();
ctx = SSL_CTX_new(method);
if (ctx == NULL) {
- logger_->log_error("Could not create SSL context, error: %s.",
- std::strerror(errno));
+ logger_->log_error("Could not create SSL context, error: %s.", std::strerror(errno));
error_value = TLS_ERROR_CONTEXT;
return error_value;
}
@@ -78,56 +75,40 @@ int16_t TLSContext::initialize() {
std::string passphrase;
std::string caCertificate;
- if (!(configure_->get(Configure::nifi_security_client_certificate,
- certificate)
- && configure_->get(Configure::nifi_security_client_private_key,
- privatekey))) {
- logger_->log_error(
- "Certificate and Private Key PEM file not configured, error: %s.",
- std::strerror(errno));
+ if (!(configure_->get(Configure::nifi_security_client_certificate, certificate) && configure_->get(Configure::nifi_security_client_private_key, privatekey))) {
+ logger_->log_error("Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno));
error_value = TLS_ERROR_PEM_MISSING;
return error_value;
}
// load certificates and private key in PEM format
- if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
- <= 0) {
- logger_->log_error("Could not create load certificate, error : %s",
- std::strerror(errno));
+ if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) {
+ logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno));
error_value = TLS_ERROR_CERT_MISSING;
return error_value;
}
- if (configure_->get(Configure::nifi_security_client_pass_phrase,
- passphrase)) {
+ if (configure_->get(Configure::nifi_security_client_pass_phrase, passphrase)) {
// if the private key has passphase
SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
- SSL_CTX_set_default_passwd_cb_userdata(
- ctx, static_cast<void*>(configure_.get()));
+ SSL_CTX_set_default_passwd_cb_userdata(ctx, static_cast<void*>(configure_.get()));
}
- int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(),
- SSL_FILETYPE_PEM);
+ int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), SSL_FILETYPE_PEM);
if (retp != 1) {
- logger_->log_error(
- "Could not create load private key,%i on %s error : %s", retp,
- privatekey.c_str(), std::strerror(errno));
+ logger_->log_error("Could not create load private key,%i on %s error : %s", retp, privatekey.c_str(), std::strerror(errno));
error_value = TLS_ERROR_KEY_ERROR;
return error_value;
}
// verify private key
if (!SSL_CTX_check_private_key(ctx)) {
- logger_->log_error(
- "Private key does not match the public certificate, error : %s",
- std::strerror(errno));
+ logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno));
error_value = TLS_ERROR_KEY_ERROR;
return error_value;
}
// load CA certificates
- if (configure_->get(Configure::nifi_security_client_ca_certificate,
- caCertificate)) {
+ if (configure_->get(Configure::nifi_security_client_ca_certificate, caCertificate)) {
retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0);
if (retp == 0) {
- logger_->log_error("Can not load CA certificate, Exiting, error : %s",
- std::strerror(errno));
+ logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno));
error_value = TLS_ERROR_CERT_ERROR;
return error_value;
}
@@ -149,24 +130,24 @@ TLSSocket::~TLSSocket() {
* @param port connecting port
* @param listeners number of listeners in the queue
*/
-TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context,
- const std::string &hostname, const uint16_t port,
- const uint16_t listeners)
+TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners)
: Socket(context, hostname, port, listeners),
- ssl(0), logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
+ ssl(0),
+ logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
context_ = context;
}
-TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context,
- const std::string &hostname, const uint16_t port)
+TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port)
: Socket(context, hostname, port, 0),
- ssl(0), logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
+ ssl(0),
+ logger_(logging::LoggerFactory<TLSSocket>::getLogger()) {
context_ = context;
}
TLSSocket::TLSSocket(const TLSSocket &&d)
: Socket(std::move(d)),
- ssl(0), logger_(std::move(d.logger_)) {
+ ssl(0),
+ logger_(std::move(d.logger_)) {
context_ = d.context_;
}
@@ -178,15 +159,13 @@ int16_t TLSSocket::initialize() {
ssl = SSL_new(context_->getContext());
SSL_set_fd(ssl, socket_file_descriptor_);
if (SSL_connect(ssl) == -1) {
- logger_->log_error("SSL socket connect failed to %s %d",
- requested_hostname_.c_str(), port_);
+ logger_->log_error("SSL socket connect failed to %s %d", requested_hostname_.c_str(), port_);
SSL_free(ssl);
ssl = NULL;
close(socket_file_descriptor_);
return -1;
} else {
- logger_->log_info("SSL socket connect success to %s %d",
- requested_hostname_.c_str(), port_);
+ logger_->log_info("SSL socket connect success to %s %d", requested_hostname_.c_str(), port_);
return 0;
}
}
@@ -213,8 +192,7 @@ int TLSSocket::writeData(uint8_t *value, int size) {
sent = SSL_write(ssl, value + bytes, size - bytes);
// check for errors
if (sent < 0) {
- logger_->log_error("Site2Site Peer socket %d send failed %s",
- socket_file_descriptor_, strerror(errno));
+ logger_->log_error("Site2Site Peer socket %d send failed %s", socket_file_descriptor_, strerror(errno));
return sent;
}
bytes += sent;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp
index b3c76db..bbc86a5 100644
--- a/libminifi/src/processors/AppendHostInfo.cpp
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -46,19 +46,10 @@ namespace processors {
#define HOST_NAME_MAX 255
#endif
-core::Property AppendHostInfo::InterfaceName(
- "Network Interface Name",
- "Network interface from which to read an IP v4 address", "eth0");
-core::Property AppendHostInfo::HostAttribute(
- "Hostname Attribute",
- "Flowfile attribute to used to record the agent's hostname",
- "source.hostname");
-core::Property AppendHostInfo::IPAttribute(
- "IP Attribute",
- "Flowfile attribute to used to record the agent's IP address",
- "source.ipv4");
-core::Relationship AppendHostInfo::Success(
- "success", "success operational on the flow record");
+core::Property AppendHostInfo::InterfaceName("Network Interface Name", "Network interface from which to read an IP v4 address", "eth0");
+core::Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile attribute to used to record the agent's hostname", "source.hostname");
+core::Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute to used to record the agent's IP address", "source.ipv4");
+core::Relationship AppendHostInfo::Success("success", "success operational on the flow record");
void AppendHostInfo::initialize() {
// Set the supported properties
@@ -74,8 +65,7 @@ void AppendHostInfo::initialize() {
setSupportedRelationships(relationships);
}
-void AppendHostInfo::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::shared_ptr<core::FlowFile> flow = session->get();
if (!flow)
return;
@@ -84,8 +74,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context,
std::string hostAttribute = "";
context->getProperty(HostAttribute.getName(), hostAttribute);
- flow->addAttribute(hostAttribute.c_str(),
- org::apache::nifi::minifi::io::Socket::getMyHostName());
+ flow->addAttribute(hostAttribute.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName());
// Get IP address for the specified interface
std::string iface;
@@ -103,9 +92,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context,
std::string ipAttribute;
context->getProperty(IPAttribute.getName(), ipAttribute);
- flow->addAttribute(
- ipAttribute.c_str(),
- inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr));
+ flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr));
}
// Transfer to the relationship
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 701c645..323d69a 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -33,31 +33,18 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property ExecuteProcess::Command(
- "Command",
- "Specifies the command to be executed; if just the name of an executable"
- " is provided, it must be in the user's environment PATH.",
- "");
-core::Property ExecuteProcess::CommandArguments(
- "Command Arguments",
- "The arguments to supply to the executable delimited by white space. White "
- "space can be escaped by enclosing it in double-quotes.",
- "");
-core::Property ExecuteProcess::WorkingDir(
- "Working Directory",
- "The directory to use as the current working directory when executing the command",
- "");
-core::Property ExecuteProcess::BatchDuration(
- "Batch Duration",
- "If the process is expected to be long-running and produce textual output, a "
- "batch duration can be specified.",
- "0");
-core::Property ExecuteProcess::RedirectErrorStream(
- "Redirect Error Stream",
- "If true will redirect any error stream output of the process to the output stream.",
- "false");
-core::Relationship ExecuteProcess::Success(
- "success", "All created FlowFiles are routed to this relationship.");
+core::Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable"
+ " is provided, it must be in the user's environment PATH.",
+ "");
+core::Property ExecuteProcess::CommandArguments("Command Arguments", "The arguments to supply to the executable delimited by white space. White "
+ "space can be escaped by enclosing it in double-quotes.",
+ "");
+core::Property ExecuteProcess::WorkingDir("Working Directory", "The directory to use as the current working directory when executing the command", "");
+core::Property ExecuteProcess::BatchDuration("Batch Duration", "If the process is expected to be long-running and produce textual output, a "
+ "batch duration can be specified.",
+ "0");
+core::Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream", "If true will redirect any error stream output of the process to the output stream.", "false");
+core::Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship.");
void ExecuteProcess::initialize() {
// Set the supported properties
@@ -74,8 +61,7 @@ void ExecuteProcess::initialize() {
setSupportedRelationships(relationships);
}
-void ExecuteProcess::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::string value;
if (context->getProperty(Command.getName(), value)) {
this->_command = value;
@@ -88,15 +74,12 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
}
if (context->getProperty(BatchDuration.getName(), value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, _batchDuration, unit)
- && core::Property::ConvertTimeUnitToMS(_batchDuration, unit,
- _batchDuration)) {
+ if (core::Property::StringToTime(value, _batchDuration, unit) && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) {
logger_->log_info("Setting _batchDuration");
}
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(
- value, _redirectErrorStream);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _redirectErrorStream);
}
this->_fullCommand = _command + " " + _commandArgument;
if (_fullCommand.length() == 0) {
@@ -106,8 +89,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
if (_workingDir.length() > 0 && _workingDir != ".") {
// change to working directory
if (chdir(_workingDir.c_str()) != 0) {
- logger_->log_error("Execute Command can not chdir %s",
- _workingDir.c_str());
+ logger_->log_error("Execute Command can not chdir %s", _workingDir.c_str());
yield();
return;
}
@@ -156,21 +138,18 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
close(_pipefd[1]);
if (_batchDuration > 0) {
while (1) {
- std::this_thread::sleep_for(
- std::chrono::milliseconds(_batchDuration));
+ std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
char buffer[4096];
int numRead = read(_pipefd[0], buffer, sizeof(buffer));
if (numRead <= 0)
break;
logger_->log_info("Execute Command Respond %d", numRead);
ExecuteProcess::WriteCallback callback(buffer, numRead);
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
continue;
flowFile->addAttribute("command", _command.c_str());
- flowFile->addAttribute("command.arguments",
- _commandArgument.c_str());
+ flowFile->addAttribute("command.arguments", _commandArgument.c_str());
session->write(flowFile, &callback);
session->transfer(flowFile, Success);
session->commit();
@@ -181,21 +160,18 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
int totalRead = 0;
std::shared_ptr<FlowFileRecord> flowFile = nullptr;
while (1) {
- int numRead = read(_pipefd[0], bufPtr,
- (sizeof(buffer) - totalRead));
+ int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
if (numRead <= 0) {
if (totalRead > 0) {
logger_->log_info("Execute Command Respond %d", totalRead);
// child exits and close the pipe
ExecuteProcess::WriteCallback callback(buffer, totalRead);
if (!flowFile) {
- flowFile = std::static_pointer_cast<FlowFileRecord>(
- session->create());
+ flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
break;
flowFile->addAttribute("command", _command.c_str());
- flowFile->addAttribute("command.arguments",
- _commandArgument.c_str());
+ flowFile->addAttribute("command.arguments", _commandArgument.c_str());
session->write(flowFile, &callback);
} else {
session->append(flowFile, &callback);
@@ -206,17 +182,14 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
} else {
if (numRead == (sizeof(buffer) - totalRead)) {
// we reach the max buffer size
- logger_->log_info("Execute Command Max Respond %d",
- sizeof(buffer));
+ logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
if (!flowFile) {
- flowFile = std::static_pointer_cast<FlowFileRecord>(
- session->create());
+ flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
continue;
flowFile->addAttribute("command", _command.c_str());
- flowFile->addAttribute("command.arguments",
- _commandArgument.c_str());
+ flowFile->addAttribute("command.arguments", _commandArgument.c_str());
session->write(flowFile, &callback);
} else {
session->append(flowFile, &callback);
@@ -234,11 +207,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context,
died = wait(&status);
if (WIFEXITED(status)) {
- logger_->log_info("Execute Command Complete %s status %d pid %d",
- _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+ logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
} else {
- logger_->log_info("Execute Command Complete %s status %d pid %d",
- _fullCommand.c_str(), WTERMSIG(status), _pid);
+ logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
}
close(_pipefd[0]);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp
index 34c0ae2..3741a8f 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -40,20 +40,11 @@ namespace minifi {
namespace processors {
const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-core::Property GenerateFlowFile::FileSize(
- "File Size", "The size of the file that will be used", "1 kB");
-core::Property GenerateFlowFile::BatchSize(
- "Batch Size",
- "The number of FlowFiles to be transferred in each invocation", "1");
-core::Property GenerateFlowFile::DataFormat(
- "Data Format", "Specifies whether the data should be Text or Binary",
- GenerateFlowFile::DATA_FORMAT_BINARY);
-core::Property GenerateFlowFile::UniqueFlowFiles(
- "Unique FlowFiles",
- "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles",
- "true");
-core::Relationship GenerateFlowFile::Success(
- "success", "success operational on the flow record");
+core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
+core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
+core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
+core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
+core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
void GenerateFlowFile::initialize() {
// Set the supported properties
@@ -69,8 +60,7 @@ void GenerateFlowFile::initialize() {
setSupportedRelationships(relationships);
}
-void GenerateFlowFile::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
int64_t batchSize = 1;
bool uniqueFlowFile = true;
int64_t fileSize = 1024;
@@ -83,8 +73,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context,
core::Property::StringToInt(value, batchSize);
}
if (context->getProperty(UniqueFlowFiles.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
- uniqueFlowFile);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile);
}
if (!uniqueFlowFile) {
@@ -102,8 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context,
}
for (int i = 0; i < batchSize; i++) {
// For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
return;
if (fileSize > 0)
@@ -126,8 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context,
GenerateFlowFile::WriteCallback callback(_data, _dataSize);
for (int i = 0; i < batchSize; i++) {
// For each batch
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
return;
if (fileSize > 0)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index e39afec..f1dbb21 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -44,50 +44,22 @@ namespace nifi {
namespace minifi {
namespace processors {
-
-
-
-core::Property GetFile::BatchSize(
- "Batch Size", "The maximum number of files to pull in each iteration",
- "10");
-core::Property GetFile::Directory(
- "Input Directory", "The input directory from which to pull files", ".");
-core::Property GetFile::IgnoreHiddenFile(
- "Ignore Hidden Files",
- "Indicates whether or not hidden files should be ignored", "true");
-core::Property GetFile::KeepSourceFile(
- "Keep Source File",
- "If true, the file is not deleted after it has been copied to the Content Repository",
- "false");
-core::Property GetFile::MaxAge(
- "Maximum File Age",
- "The minimum age that a file must be in order to be pulled;"
- " any file younger than this amount of time (according to last modification date) will be ignored",
- "0 sec");
-core::Property GetFile::MinAge(
- "Minimum File Age",
- "The maximum age that a file must be in order to be pulled; any file"
- "older than this amount of time (according to last modification date) will be ignored",
- "0 sec");
-core::Property GetFile::MaxSize(
- "Maximum File Size",
- "The maximum size that a file can be in order to be pulled", "0 B");
-core::Property GetFile::MinSize(
- "Minimum File Size",
- "The minimum size that a file must be in order to be pulled", "0 B");
-core::Property GetFile::PollInterval(
- "Polling Interval",
- "Indicates how long to wait before performing a directory listing",
- "0 sec");
-core::Property GetFile::Recurse(
- "Recurse Subdirectories",
- "Indicates whether or not to pull files from subdirectories", "true");
-core::Property GetFile::FileFilter(
- "File Filter",
- "Only files whose names match the given regular expression will be picked up",
- "[^\\.].*");
-core::Relationship GetFile::Success("success",
- "All files are routed to success");
+core::Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10");
+core::Property GetFile::Directory("Input Directory", "The input directory from which to pull files", ".");
+core::Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true");
+core::Property GetFile::KeepSourceFile("Keep Source File", "If true, the file is not deleted after it has been copied to the Content Repository", "false");
+core::Property GetFile::MaxAge("Maximum File Age", "The minimum age that a file must be in order to be pulled;"
+ " any file younger than this amount of time (according to last modification date) will be ignored",
+ "0 sec");
+core::Property GetFile::MinAge("Minimum File Age", "The maximum age that a file must be in order to be pulled; any file"
+ "older than this amount of time (according to last modification date) will be ignored",
+ "0 sec");
+core::Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B");
+core::Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B");
+core::Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec");
+core::Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true");
+core::Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*");
+core::Relationship GetFile::Success("success", "All files are routed to success");
void GetFile::initialize() {
// Set the supported properties
@@ -110,8 +82,7 @@ void GetFile::initialize() {
setSupportedRelationships(relationships);
}
-void GetFile::onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory) {
+void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string value;
if (context->getProperty(Directory.getName(), value)) {
@@ -121,27 +92,21 @@ void GetFile::onSchedule(core::ProcessContext *context,
core::Property::StringToInt(value, request_.batchSize);
}
if (context->getProperty(IgnoreHiddenFile.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(
- value, request_.ignoreHiddenFile);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.ignoreHiddenFile);
}
if (context->getProperty(KeepSourceFile.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(
- value, request_.keepSourceFile);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.keepSourceFile);
}
if (context->getProperty(MaxAge.getName(), value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, request_.maxAge, unit)
- && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit,
- request_.maxAge)) {
+ if (core::Property::StringToTime(value, request_.maxAge, unit) && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit, request_.maxAge)) {
logger_->log_debug("successfully applied _maxAge");
}
}
if (context->getProperty(MinAge.getName(), value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, request_.minAge, unit)
- && core::Property::ConvertTimeUnitToMS(request_.minAge, unit,
- request_.minAge)) {
+ if (core::Property::StringToTime(value, request_.minAge, unit) && core::Property::ConvertTimeUnitToMS(request_.minAge, unit, request_.minAge)) {
logger_->log_debug("successfully applied _minAge");
}
}
@@ -153,15 +118,12 @@ void GetFile::onSchedule(core::ProcessContext *context,
}
if (context->getProperty(PollInterval.getName(), value)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, request_.pollInterval, unit)
- && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit,
- request_.pollInterval)) {
+ if (core::Property::StringToTime(value, request_.pollInterval, unit) && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit, request_.pollInterval)) {
logger_->log_debug("successfully applied _pollInterval");
}
}
if (context->getProperty(Recurse.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(
- value, request_.recursive);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.recursive);
}
if (context->getProperty(FileFilter.getName(), value)) {
@@ -169,13 +131,11 @@ void GetFile::onSchedule(core::ProcessContext *context,
}
}
-void GetFile::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
// Perform directory list
logger_->log_info("Is listing empty %i", isListingEmpty());
if (isListingEmpty()) {
- if (request_.pollInterval == 0
- || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
+ if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
performListing(request_.directory, request_);
last_listing_time_.store(getTimeMillis());
}
@@ -190,8 +150,7 @@ void GetFile::onTrigger(core::ProcessContext *context,
std::string fileName = list.front();
list.pop();
logger_->log_info("GetFile process %s", fileName.c_str());
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (flowFile == nullptr)
return;
std::size_t found = fileName.find_last_of("/\\");
@@ -224,12 +183,10 @@ void GetFile::putListing(std::string fileName) {
_dirList.push(fileName);
}
-void GetFile::pollListing(std::queue<std::string> &list,
- const GetFileRequest &request) {
+void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) {
std::lock_guard<std::mutex> lock(mutex_);
- while (!_dirList.empty()
- && (request.maxSize == 0 || list.size() < request.maxSize)) {
+ while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) {
std::string fileName = _dirList.front();
_dirList.pop();
list.push(fileName);
@@ -238,8 +195,7 @@ void GetFile::pollListing(std::queue<std::string> &list,
return;
}
-bool GetFile::acceptFile(std::string fullName, std::string name,
- const GetFileRequest &request) {
+bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRequest &request) {
struct stat statbuf;
if (stat(fullName.c_str(), &statbuf) == 0) {
@@ -296,8 +252,7 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) {
std::string d_name = entry->d_name;
if ((entry->d_type & DT_DIR)) {
// if this is a directory
- if (request.recursive && strcmp(d_name.c_str(), "..") != 0
- && strcmp(d_name.c_str(), ".") != 0) {
+ if (request.recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
std::string path = dir + "/" + d_name;
performListing(path, request);
}