You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/11/29 05:55:37 UTC
[1/2] nifi-minifi-cpp git commit: MINIFI-98 Providing logging
information and exiting in case the specified configuration YAML cannot be
located. MINIFI-99 Providing default config.yml and updating README with
minor adjustments to reflect default build c
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master c6b7ac0b0 -> f14f20064
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index ab1fcf7..3598716 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -35,206 +35,207 @@ FlowController *FlowController::_flowController(NULL);
FlowController::FlowController(std::string name)
: _name(name)
{
- uuid_generate(_uuid);
+ uuid_generate(_uuid);
- // Setup the default values
- _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
- _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
- _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
- _running = false;
- _initialized = false;
- _root = NULL;
- _logger = Logger::getLogger();
- _protocol = new FlowControlProtocol(this);
+ // Setup the default values
+ _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
+ _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+ _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+ _running = false;
+ _initialized = false;
+ _root = NULL;
+ _logger = Logger::getLogger();
+ _protocol = new FlowControlProtocol(this);
- // NiFi config properties
- _configure = Configure::getConfigure();
+ // NiFi config properties
+ _configure = Configure::getConfigure();
- std::string rawConfigFileString;
- _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
+ std::string rawConfigFileString;
+ _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
if (!rawConfigFileString.empty())
{
- _configurationFileName = rawConfigFileString;
- }
+ _configurationFileName = rawConfigFileString;
+ }
- char *path = NULL;
- char full_path[PATH_MAX];
+ char *path = NULL;
+ char full_path[PATH_MAX];
- std::string adjustedFilename;
+ std::string adjustedFilename;
if (!_configurationFileName.empty())
{
- // perform a naive determination if this is a relative path
+ // perform a naive determination if this is a relative path
if (_configurationFileName.c_str()[0] != '/')
{
- adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName;
+ adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName;
}
else
{
- adjustedFilename = _configurationFileName;
- }
- }
+ adjustedFilename = _configurationFileName;
+ }
+ }
- path = realpath(adjustedFilename.c_str(), full_path);
+ path = realpath(adjustedFilename.c_str(), full_path);
if (!path)
{
- _logger->log_error("Could not locate path from provided configuration file name.");
- }
+ _logger->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path);
+ exit(1);
+ }
- std::string pathString(path);
- _configurationFileName = pathString;
- _logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
+ std::string pathString(path);
+ _configurationFileName = pathString;
+ _logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
- // Create repos for flow record and provenance
- _provenanceRepo = new ProvenanceRepository();
- _provenanceRepo->initialize();
+ // Create repos for flow record and provenance
+ _provenanceRepo = new ProvenanceRepository();
+ _provenanceRepo->initialize();
- _logger->log_info("FlowController %s created", _name.c_str());
+ _logger->log_info("FlowController %s created", _name.c_str());
}
FlowController::~FlowController()
{
- stop(true);
- unload();
- delete _protocol;
- delete _provenanceRepo;
+ stop(true);
+ unload();
+ delete _protocol;
+ delete _provenanceRepo;
}
bool FlowController::isRunning()
{
- return (_running);
+ return (_running);
}
bool FlowController::isInitialized()
{
- return (_initialized);
+ return (_initialized);
}
void FlowController::stop(bool force)
{
if (_running)
{
- _logger->log_info("Stop Flow Controller");
- this->_timerScheduler.stop();
- // Wait for sometime for thread stop
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- if (this->_root)
- this->_root->stopProcessing(&this->_timerScheduler);
- _running = false;
- }
+ _logger->log_info("Stop Flow Controller");
+ this->_timerScheduler.stop();
+ // Wait for sometime for thread stop
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ if (this->_root)
+ this->_root->stopProcessing(&this->_timerScheduler);
+ _running = false;
+ }
}
void FlowController::unload()
{
if (_running)
{
- stop(true);
- }
+ stop(true);
+ }
if (_initialized)
{
- _logger->log_info("Unload Flow Controller");
- if (_root)
- delete _root;
- _root = NULL;
- _initialized = false;
- _name = "";
- }
+ _logger->log_info("Unload Flow Controller");
+ if (_root)
+ delete _root;
+ _root = NULL;
+ _initialized = false;
+ _name = "";
+ }
- return;
+ return;
}
void FlowController::reload(std::string xmlFile)
{
- _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
- stop(true);
- unload();
- std::string oldxmlFile = this->_configurationFileName;
- this->_configurationFileName = xmlFile;
- load(ConfigFormat::XML);
- start();
+ _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
+ stop(true);
+ unload();
+ std::string oldxmlFile = this->_configurationFileName;
+ this->_configurationFileName = xmlFile;
+ load(ConfigFormat::XML);
+ start();
if (!this->_root)
{
- this->_configurationFileName = oldxmlFile;
- _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
- stop(true);
- unload();
- load(ConfigFormat::XML);
- start();
- }
+ this->_configurationFileName = oldxmlFile;
+ _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
+ stop(true);
+ unload();
+ load(ConfigFormat::XML);
+ start();
+ }
}
Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
{
- Processor *processor = NULL;
+ Processor *processor = NULL;
if (name == GenerateFlowFile::ProcessorName)
{
- processor = new GenerateFlowFile(name, uuid);
+ processor = new GenerateFlowFile(name, uuid);
}
else if (name == LogAttribute::ProcessorName)
{
- processor = new LogAttribute(name, uuid);
+ processor = new LogAttribute(name, uuid);
}
else if (name == RealTimeDataCollector::ProcessorName)
{
- processor = new RealTimeDataCollector(name, uuid);
+ processor = new RealTimeDataCollector(name, uuid);
}
else if (name == GetFile::ProcessorName)
{
- processor = new GetFile(name, uuid);
+ processor = new GetFile(name, uuid);
}
else if (name == TailFile::ProcessorName)
{
- processor = new TailFile(name, uuid);
+ processor = new TailFile(name, uuid);
}
else if (name == ListenSyslog::ProcessorName)
{
- processor = new ListenSyslog(name, uuid);
+ processor = new ListenSyslog(name, uuid);
}
else if (name == ExecuteProcess::ProcessorName)
{
- processor = new ExecuteProcess(name, uuid);
+ processor = new ExecuteProcess(name, uuid);
}
else
{
- _logger->log_error("No Processor defined for %s", name.c_str());
- return NULL;
- }
+ _logger->log_error("No Processor defined for %s", name.c_str());
+ return NULL;
+ }
- //! initialize the processor
- processor->initialize();
+ //! initialize the processor
+ processor->initialize();
- return processor;
+ return processor;
}
ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid)
{
- return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
+ return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
}
ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid)
{
- return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
+ return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
}
Connection *FlowController::createConnection(std::string name, uuid_t uuid)
{
- return new Connection(name, uuid);
+ return new Connection(name, uuid);
}
void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent)
{
- uuid_t uuid;
- xmlNode *currentNode;
- Connection *connection = NULL;
+ uuid_t uuid;
+ xmlNode *currentNode;
+ Connection *connection = NULL;
if (!parent)
{
- _logger->log_error("parseProcessNode: no parent group existed");
- return;
- }
+ _logger->log_error("parseProcessNode: no parent group existed");
+ return;
+ }
- // generate the random UIID
- uuid_generate(uuid);
+ // generate the random UIID
+ uuid_generate(uuid);
for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next)
{
@@ -242,836 +243,829 @@ void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *p
{
if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
{
- char *id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseConnection: id => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
- char *name = (char *) xmlNodeGetContent(currentNode);
- if (name) {
- _logger->log_debug("parseConnection: name => [%s]", name);
- connection = this->createConnection(name, uuid);
- if (connection == NULL) {
- xmlFree(name);
- return;
- }
- xmlFree(name);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) {
- char *id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseConnection: sourceId => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- if (connection)
- connection->setSourceProcessorUUID(uuid);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) {
- char *id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseConnection: destinationId => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- if (connection)
- connection->setDestinationProcessorUUID(uuid);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- int64_t maxWorkQueueSize = 0;
- if (temp) {
- if (Property::StringToInt(temp, maxWorkQueueSize)) {
- _logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize);
- if (connection)
- connection->setMaxQueueSize(maxWorkQueueSize);
-
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- int64_t maxWorkQueueDataSize = 0;
- if (temp) {
- if (Property::StringToInt(temp, maxWorkQueueDataSize)) {
- _logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize);
- if (connection)
- connection->setMaxQueueDataSize(maxWorkQueueDataSize);
-
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- std::string relationshipName = temp;
- if (!relationshipName.empty()) {
- Relationship relationship(relationshipName, "");
- _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str());
- if (connection)
- connection->setRelationship(relationship);
- } else {
- Relationship empty;
- _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str());
- if (connection)
- connection->setRelationship(empty);
- }
- xmlFree(temp);
- }
- }
- } // if (currentNode->type == XML_ELEMENT_NODE)
- } // for node
-
- if (connection)
- parent->addConnection(connection);
-
- return;
+ char *id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseConnection: id => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+ char *name = (char *) xmlNodeGetContent(currentNode);
+ if (name) {
+ _logger->log_debug("parseConnection: name => [%s]", name);
+ connection = this->createConnection(name, uuid);
+ if (connection == NULL) {
+ xmlFree(name);
+ return;
+ }
+ xmlFree(name);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) {
+ char *id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseConnection: sourceId => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ if (connection)
+ connection->setSourceProcessorUUID(uuid);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) {
+ char *id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseConnection: destinationId => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ if (connection)
+ connection->setDestinationProcessorUUID(uuid);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ int64_t maxWorkQueueSize = 0;
+ if (temp) {
+ if (Property::StringToInt(temp, maxWorkQueueSize)) {
+ _logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize);
+ if (connection)
+ connection->setMaxQueueSize(maxWorkQueueSize);
+
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ int64_t maxWorkQueueDataSize = 0;
+ if (temp) {
+ if (Property::StringToInt(temp, maxWorkQueueDataSize)) {
+ _logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize);
+ if (connection)
+ connection->setMaxQueueDataSize(maxWorkQueueDataSize);
+
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ std::string relationshipName = temp;
+ if (!relationshipName.empty()) {
+ Relationship relationship(relationshipName, "");
+ _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str());
+ if (connection)
+ connection->setRelationship(relationship);
+ } else {
+ Relationship empty;
+ _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str());
+ if (connection)
+ connection->setRelationship(empty);
+ }
+ xmlFree(temp);
+ }
+ }
+ } // if (currentNode->type == XML_ELEMENT_NODE)
+ } // for node
+
+ if (connection)
+ parent->addConnection(connection);
+
+ return;
}
void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) {
- uuid_t uuid;
- xmlNode *currentNode;
- ProcessGroup *group = NULL;
-
- // generate the random UIID
- uuid_generate(uuid);
-
- for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
- if (currentNode->type == XML_ELEMENT_NODE) {
- if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
- char *id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseRootProcessGroup: id => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
- char *name = (char *) xmlNodeGetContent(currentNode);
- if (name) {
- _logger->log_debug("parseRootProcessGroup: name => [%s]", name);
- group = this->createRootProcessGroup(name, uuid);
- if (group == NULL) {
- xmlFree(name);
- return;
- }
- // Set the root process group
- this->_root = group;
- this->_name = name;
- xmlFree(name);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) {
- this->parseProcessorNode(doc, currentNode, group);
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) {
- this->parseConnection(doc, currentNode, group);
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) {
- this->parseRemoteProcessGroup(doc, currentNode, group);
- }
- } // if (currentNode->type == XML_ELEMENT_NODE)
- } // for node
+ uuid_t uuid;
+ xmlNode *currentNode;
+ ProcessGroup *group = NULL;
+
+ // generate the random UIID
+ uuid_generate(uuid);
+
+ for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+ if (currentNode->type == XML_ELEMENT_NODE) {
+ if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+ char *id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+ char *name = (char *) xmlNodeGetContent(currentNode);
+ if (name) {
+ _logger->log_debug("parseRootProcessGroup: name => [%s]", name);
+ group = this->createRootProcessGroup(name, uuid);
+ if (group == NULL) {
+ xmlFree(name);
+ return;
+ }
+ // Set the root process group
+ this->_root = group;
+ this->_name = name;
+ xmlFree(name);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) {
+ this->parseProcessorNode(doc, currentNode, group);
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) {
+ this->parseConnection(doc, currentNode, group);
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) {
+ this->parseRemoteProcessGroup(doc, currentNode, group);
+ }
+ } // if (currentNode->type == XML_ELEMENT_NODE)
+ } // for node
}
void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
- uuid_t uuid;
- ProcessGroup *group = NULL;
+ uuid_t uuid;
+ ProcessGroup *group = NULL;
- // generate the random UIID
- uuid_generate(uuid);
+ // generate the random UIID
+ uuid_generate(uuid);
- std::string flowName = rootFlowNode["name"].as<std::string>();
+ std::string flowName = rootFlowNode["name"].as<std::string>();
- char uuidStr[37];
- uuid_unparse(_uuid, uuidStr);
- _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
- _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
- group = this->createRootProcessGroup(flowName, uuid);
- this->_root = group;
- this->_name = flowName;
+ char uuidStr[37];
+ uuid_unparse(_uuid, uuidStr);
+ _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
+ _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
+ group = this->createRootProcessGroup(flowName, uuid);
+ this->_root = group;
+ this->_name = flowName;
}
void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) {
- int64_t schedulingPeriod = -1;
- int64_t penalizationPeriod = -1;
- int64_t yieldPeriod = -1;
- int64_t runDurationNanos = -1;
- uuid_t uuid;
- Processor *processor = NULL;
-
- if (!parentGroup) {
- _logger->log_error("parseProcessNodeYaml: no parent group exists");
- return;
- }
+ int64_t schedulingPeriod = -1;
+ int64_t penalizationPeriod = -1;
+ int64_t yieldPeriod = -1;
+ int64_t runDurationNanos = -1;
+ uuid_t uuid;
+ Processor *processor = NULL;
+
+ if (!parentGroup) {
+ _logger->log_error("parseProcessNodeYaml: no parent group exists");
+ return;
+ }
- if (processorsNode) {
- // Evaluate sequence of processors
- int numProcessors = processorsNode.size();
- if (numProcessors < 1) {
- throw new std::invalid_argument("There must be at least one processor configured.");
- }
+ if (processorsNode) {
- std::vector<ProcessorConfig> processorConfigs;
+ if (processorsNode.IsSequence()) {
+ // Evaluate sequence of processors
+ int numProcessors = processorsNode.size();
- if (processorsNode.IsSequence()) {
- for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
- ProcessorConfig procCfg;
- YAML::Node procNode = iter->as<YAML::Node>();
- procCfg.name = procNode["name"].as<std::string>();
- _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str());
- procCfg.javaClass = procNode["class"].as<std::string>();
- _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str());
+ for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+ ProcessorConfig procCfg;
+ YAML::Node procNode = iter->as<YAML::Node>();
- char uuidStr[37];
- uuid_unparse(_uuid, uuidStr);
+ procCfg.name = procNode["name"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str());
+ procCfg.javaClass = procNode["class"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str());
- // generate the random UUID
- uuid_generate(uuid);
+ char uuidStr[37];
+ uuid_unparse(_uuid, uuidStr);
- // Determine the processor name only from the Java class
- int lastOfIdx = procCfg.javaClass.find_last_of(".");
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- int nameLength = procCfg.javaClass.length() - lastOfIdx;
- std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
- processor = this->createProcessor(processorName, uuid);
- }
+ // generate the random UUID
+ uuid_generate(uuid);
- if (!processor) {
- _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr);
- throw std::invalid_argument("Could not create processor " + procCfg.name);
- }
- processor->setName(procCfg.name);
+ // Determine the processor name only from the Java class
+ int lastOfIdx = procCfg.javaClass.find_last_of(".");
+ if (lastOfIdx != std::string::npos) {
+ lastOfIdx++; // if a value is found, increment to move beyond the .
+ int nameLength = procCfg.javaClass.length() - lastOfIdx;
+ std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
+ processor = this->createProcessor(processorName, uuid);
+ }
- procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+ if (!processor) {
+ _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr);
+ throw std::invalid_argument("Could not create processor " + procCfg.name);
+ }
+ processor->setName(procCfg.name);
+
+ procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
_logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str());
- procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
- _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]",
- procCfg.schedulingStrategy.c_str());
- procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
- _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
- procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
- _logger->log_debug("parseProcessorNode: penalization period => [%s]",
- procCfg.penalizationPeriod.c_str());
- procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
- _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str());
- procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
- _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str());
-
- // handle auto-terminated relationships
- YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
- std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
- && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
- relIter != autoTerminatedSequence.end(); ++relIter) {
- std::string autoTerminatedRel = relIter->as<std::string>();
- rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
- }
- }
- procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
-
- // handle processor properties
- YAML::Node propertiesNode = procNode["Properties"];
- parsePropertiesNodeYaml(&propertiesNode, processor);
-
- // Take care of scheduling
- TimeUnit unit;
- if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
- && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
- _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
- }
-
- if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
- && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
- _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms",
- penalizationPeriod);
- processor->setPenalizationPeriodMsec(penalizationPeriod);
- }
-
- if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
- && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
- _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
- processor->setYieldPeriodMsec(yieldPeriod);
- }
-
- // Default to running
- processor->setScheduledState(RUNNING);
-
- if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
- processor->setSchedulingStrategy(TIMER_DRIVEN);
- _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
- } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- processor->setSchedulingStrategy(EVENT_DRIVEN);
- _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
- } else {
- processor->setSchedulingStrategy(CRON_DRIVEN);
- _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
-
- }
-
- int64_t maxConcurrentTasks;
- if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
- _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
- }
-
- if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
- _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
- processor->setRunDurationNano(runDurationNanos);
- }
-
- std::set<Relationship> autoTerminatedRelationships;
- for (auto&& relString : procCfg.autoTerminatedRelationships) {
- Relationship relationship(relString, "");
- _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str());
- autoTerminatedRelationships.insert(relationship);
- }
-
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- parentGroup->addProcessor(processor);
- }
- }
- } else {
- throw new std::invalid_argument(
- "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
- }
+ procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]",
+ procCfg.schedulingStrategy.c_str());
+ procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
+ procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: penalization period => [%s]",
+ procCfg.penalizationPeriod.c_str());
+ procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str());
+ procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+ _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str());
+
+ // handle auto-terminated relationships
+ YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+ std::vector<std::string> rawAutoTerminatedRelationshipValues;
+ if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
+ && autoTerminatedSequence.size() > 0) {
+ for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
+ relIter != autoTerminatedSequence.end(); ++relIter) {
+ std::string autoTerminatedRel = relIter->as<std::string>();
+ rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+ }
+ }
+ procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
+
+ // handle processor properties
+ YAML::Node propertiesNode = procNode["Properties"];
+ parsePropertiesNodeYaml(&propertiesNode, processor);
+
+ // Take care of scheduling
+ TimeUnit unit;
+ if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
+ && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
+ processor->setSchedulingPeriodNano(schedulingPeriod);
+ }
+
+ if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
+ && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+ _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms",
+ penalizationPeriod);
+ processor->setPenalizationPeriodMsec(penalizationPeriod);
+ }
+
+ if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
+ && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+ _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
+ processor->setYieldPeriodMsec(yieldPeriod);
+ }
+
+ // Default to running
+ processor->setScheduledState(RUNNING);
+
+ if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
+ processor->setSchedulingStrategy(TIMER_DRIVEN);
+ _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
+ } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+ processor->setSchedulingStrategy(EVENT_DRIVEN);
+ _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
+ } else {
+ processor->setSchedulingStrategy(CRON_DRIVEN);
+ _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
+
+ }
+
+ int64_t maxConcurrentTasks;
+ if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+ _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+
+ if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+ _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
+ processor->setRunDurationNano(runDurationNanos);
+ }
+
+ std::set<Relationship> autoTerminatedRelationships;
+ for (auto &&relString : procCfg.autoTerminatedRelationships) {
+ Relationship relationship(relString, "");
+ _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str());
+ autoTerminatedRelationships.insert(relationship);
+ }
+
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ parentGroup->addProcessor(processor);
+ }
+ }
+ } else {
+ throw new std::invalid_argument(
+ "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ }
}
void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) {
- uuid_t uuid;
+ uuid_t uuid;
- if (!parentGroup) {
- _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists");
- return;
- }
+ if (!parentGroup) {
+ _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists");
+ return;
+ }
- if (rpgNode) {
- if (rpgNode->IsSequence()) {
- for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
- YAML::Node rpgNode = iter->as<YAML::Node>();
+ if (rpgNode) {
+ if (rpgNode->IsSequence()) {
+ for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
+ YAML::Node rpgNode = iter->as<YAML::Node>();
- auto name = rpgNode["name"].as<std::string>();
- _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
+ auto name = rpgNode["name"].as<std::string>();
+ _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
- std::string url = rpgNode["url"].as<std::string>();
- _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
+ std::string url = rpgNode["url"].as<std::string>();
+ _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
- std::string timeout = rpgNode["timeout"].as<std::string>();
- _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
+ std::string timeout = rpgNode["timeout"].as<std::string>();
+ _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
- std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
- _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str());
+ std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
+ _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str());
- YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
- ProcessGroup* group = NULL;
+ YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
+ ProcessGroup *group = NULL;
- // generate the random UUID
- uuid_generate(uuid);
+ // generate the random UUID
+ uuid_generate(uuid);
- char uuidStr[37];
- uuid_unparse(_uuid, uuidStr);
+ char uuidStr[37];
+ uuid_unparse(_uuid, uuidStr);
- int64_t timeoutValue = -1;
- int64_t yieldPeriodValue = -1;
+ int64_t timeoutValue = -1;
+ int64_t yieldPeriodValue = -1;
- group = this->createRemoteProcessGroup(name.c_str(), uuid);
- group->setParent(parentGroup);
- parentGroup->addProcessGroup(group);
+ group = this->createRemoteProcessGroup(name.c_str(), uuid);
+ group->setParent(parentGroup);
+ parentGroup->addProcessGroup(group);
- TimeUnit unit;
+ TimeUnit unit;
- if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
- && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
- _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
- group->setYieldPeriodMsec(yieldPeriodValue);
- }
+ if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+ && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+ _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
+ group->setYieldPeriodMsec(yieldPeriodValue);
+ }
- if (Property::StringToTime(timeout, timeoutValue, unit)
- && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
- _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
- group->setTimeOut(timeoutValue);
- }
+ if (Property::StringToTime(timeout, timeoutValue, unit)
+ && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+ _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
+ group->setTimeOut(timeoutValue);
+ }
- group->setTransmitting(true);
- group->setURL(url);
+ group->setTransmitting(true);
+ group->setURL(url);
- if (inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
- _logger->log_debug("Got a current port, iterating...");
+ if (inputPorts.IsSequence()) {
+ for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+ _logger->log_debug("Got a current port, iterating...");
- YAML::Node currPort = portIter->as<YAML::Node>();
+ YAML::Node currPort = portIter->as<YAML::Node>();
- this->parsePortYaml(&currPort, group, SEND);
- } // for node
- }
- }
- }
- }
+ this->parsePortYaml(&currPort, group, SEND);
+ } // for node
+ }
+ }
+ }
+ }
}
void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) {
- uuid_t uuid;
- Connection *connection = NULL;
-
- if (!parent) {
- _logger->log_error("parseProcessNode: no parent group was provided");
- return;
- }
+ uuid_t uuid;
+ Connection *connection = NULL;
- if (connectionsNode) {
- int numConnections = connectionsNode->size();
- if (numConnections < 1) {
- throw new std::invalid_argument("There must be at least one connection configured.");
- }
+ if (!parent) {
+ _logger->log_error("parseProcessNode: no parent group was provided");
+ return;
+ }
- if (connectionsNode->IsSequence()) {
- for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
- // generate the random UIID
- uuid_generate(uuid);
-
- YAML::Node connectionNode = iter->as<YAML::Node>();
-
- std::string name = connectionNode["name"].as<std::string>();
- std::string destName = connectionNode["destination name"].as<std::string>();
-
- char uuidStr[37];
- uuid_unparse(_uuid, uuidStr);
-
- _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str());
- connection = this->createConnection(name, uuid);
- auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
- Relationship relationship(rawRelationship, "");
- _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str());
- if (connection)
- connection->setRelationship(relationship);
- std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
-
- Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName);
-
- if (!srcProcessor) {
- _logger->log_error("Could not locate a source with name %s to create a connection",
- connectionSrcProcName.c_str());
- throw std::invalid_argument(
- "Could not locate a source with name %s to create a connection " + connectionSrcProcName);
- }
-
- Processor *destProcessor = this->_root->findProcessor(destName);
- // If we could not find name, try by UUID
- if (!destProcessor) {
- uuid_t destUuid;
- uuid_parse(destName.c_str(), destUuid);
- destProcessor = this->_root->findProcessor(destUuid);
- }
- if (destProcessor) {
- std::string destUuid = destProcessor->getUUIDStr();
- }
-
- uuid_t srcUuid;
- uuid_t destUuid;
- srcProcessor->getUUID(srcUuid);
- connection->setSourceProcessorUUID(srcUuid);
- destProcessor->getUUID(destUuid);
- connection->setDestinationProcessorUUID(destUuid);
-
- if (connection) {
- parent->addConnection(connection);
- }
- }
- }
+ if (connectionsNode) {
+
+ if (connectionsNode->IsSequence()) {
+ for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+ // generate the random UUID
+ uuid_generate(uuid);
+
+ YAML::Node connectionNode = iter->as<YAML::Node>();
+
+ std::string name = connectionNode["name"].as<std::string>();
+ std::string destName = connectionNode["destination name"].as<std::string>();
+
+ char uuidStr[37];
+ uuid_unparse(_uuid, uuidStr);
+
+ _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str());
+ connection = this->createConnection(name, uuid);
+ auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
+ Relationship relationship(rawRelationship, "");
+ _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str());
+ if (connection)
+ connection->setRelationship(relationship);
+ std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
+
+ Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName);
+
+ if (!srcProcessor) {
+ _logger->log_error("Could not locate a source with name %s to create a connection",
+ connectionSrcProcName.c_str());
+ throw std::invalid_argument(
+ "Could not locate a source with name %s to create a connection " + connectionSrcProcName);
+ }
+
+ Processor *destProcessor = this->_root->findProcessor(destName);
+ // If we could not find name, try by UUID
+ if (!destProcessor) {
+ uuid_t destUuid;
+ uuid_parse(destName.c_str(), destUuid);
+ destProcessor = this->_root->findProcessor(destUuid);
+ }
+ if (destProcessor) {
+ std::string destUuid = destProcessor->getUUIDStr();
+ }
+
+ uuid_t srcUuid;
+ uuid_t destUuid;
+ srcProcessor->getUUID(srcUuid);
+ connection->setSourceProcessorUUID(srcUuid);
+ destProcessor->getUUID(destUuid);
+ connection->setDestinationProcessorUUID(destUuid);
+
+ if (connection) {
+ parent->addConnection(connection);
+ }
+ }
+ }
- if (connection)
- parent->addConnection(connection);
+ if (connection)
+ parent->addConnection(connection);
- return;
- }
+ return;
+ }
}
void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) {
- uuid_t uuid;
- xmlNode *currentNode;
- ProcessGroup *group = NULL;
- int64_t yieldPeriod = -1;
- int64_t timeOut = -1;
+ uuid_t uuid;
+ xmlNode *currentNode;
+ ProcessGroup *group = NULL;
+ int64_t yieldPeriod = -1;
+ int64_t timeOut = -1;
// generate the random UIID
- uuid_generate(uuid);
-
- for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
- if (currentNode->type == XML_ELEMENT_NODE) {
- if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
- char *id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseRootProcessGroup: id => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
- char *name = (char *) xmlNodeGetContent(currentNode);
- if (name) {
- _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
- group = this->createRemoteProcessGroup(name, uuid);
- if (group == NULL) {
- xmlFree(name);
- return;
- }
- group->setParent(parent);
- parent->addProcessGroup(group);
- xmlFree(name);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
- TimeUnit unit;
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToTime(temp, yieldPeriod, unit)
- && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
- _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod);
- group->setYieldPeriodMsec(yieldPeriod);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) {
- TimeUnit unit;
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToTime(temp, timeOut, unit)
- && Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
- _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
- group->setTimeOut(timeOut);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- bool transmitting;
- if (temp) {
- if (Property::StringToBool(temp, transmitting) && group) {
- _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting);
- group->setTransmitting(transmitting);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) {
- this->parsePort(doc, currentNode, group, SEND);
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) {
- this->parsePort(doc, currentNode, group, RECEIVE);
- }
- } // if (currentNode->type == XML_ELEMENT_NODE)
- } // for node
+ uuid_generate(uuid);
+
+ for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+ if (currentNode->type == XML_ELEMENT_NODE) {
+ if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+ char *id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+ char *name = (char *) xmlNodeGetContent(currentNode);
+ if (name) {
+ _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
+ group = this->createRemoteProcessGroup(name, uuid);
+ if (group == NULL) {
+ xmlFree(name);
+ return;
+ }
+ group->setParent(parent);
+ parent->addProcessGroup(group);
+ xmlFree(name);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
+ TimeUnit unit;
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToTime(temp, yieldPeriod, unit)
+ && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
+ _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod);
+ group->setYieldPeriodMsec(yieldPeriod);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) {
+ TimeUnit unit;
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToTime(temp, timeOut, unit)
+ && Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
+ _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
+ group->setTimeOut(timeOut);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ bool transmitting;
+ if (temp) {
+ if (Property::StringToBool(temp, transmitting) && group) {
+ _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting);
+ group->setTransmitting(transmitting);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) {
+ this->parsePort(doc, currentNode, group, SEND);
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) {
+ this->parsePort(doc, currentNode, group, RECEIVE);
+ }
+ } // if (currentNode->type == XML_ELEMENT_NODE)
+ } // for node
}
void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) {
- xmlNode *currentNode;
- std::string propertyValue;
- std::string propertyName;
+ xmlNode *currentNode;
+ std::string propertyValue;
+ std::string propertyName;
- if (!processor) {
- _logger->log_error("parseProcessorProperty: no parent processor existed");
- return;
- }
+ if (!processor) {
+ _logger->log_error("parseProcessorProperty: no parent processor existed");
+ return;
+ }
- for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
- if (currentNode->type == XML_ELEMENT_NODE) {
- if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
- char *name = (char *) xmlNodeGetContent(currentNode);
- if (name) {
- _logger->log_debug("parseProcessorNode: name => [%s]", name);
- propertyName = name;
- xmlFree(name);
- }
- }
- if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) {
- char *value = (char *) xmlNodeGetContent(currentNode);
- if (value) {
- _logger->log_debug("parseProcessorNode: value => [%s]", value);
- propertyValue = value;
- xmlFree(value);
- }
- }
- if (!propertyName.empty() && !propertyValue.empty()) {
- processor->setProperty(propertyName, propertyValue);
- }
- } // if (currentNode->type == XML_ELEMENT_NODE)
- } // for node
+ for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+ if (currentNode->type == XML_ELEMENT_NODE) {
+ if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+ char *name = (char *) xmlNodeGetContent(currentNode);
+ if (name) {
+ _logger->log_debug("parseProcessorNode: name => [%s]", name);
+ propertyName = name;
+ xmlFree(name);
+ }
+ }
+ if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) {
+ char *value = (char *) xmlNodeGetContent(currentNode);
+ if (value) {
+ _logger->log_debug("parseProcessorNode: value => [%s]", value);
+ propertyValue = value;
+ xmlFree(value);
+ }
+ }
+ if (!propertyName.empty() && !propertyValue.empty()) {
+ processor->setProperty(propertyName, propertyValue);
+ }
+ } // if (currentNode->type == XML_ELEMENT_NODE)
+ } // for node
}
void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) {
- uuid_t uuid;
- Processor *processor = NULL;
- RemoteProcessorGroupPort *port = NULL;
+ uuid_t uuid;
+ Processor *processor = NULL;
+ RemoteProcessorGroupPort *port = NULL;
- if (!parent) {
- _logger->log_error("parseProcessNode: no parent group existed");
- return;
- }
+ if (!parent) {
+ _logger->log_error("parseProcessNode: no parent group existed");
+ return;
+ }
- YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+ YAML::Node inputPortsObj = portNode->as<YAML::Node>();
- // generate the random UIID
- uuid_generate(uuid);
+ // generate the random UIID
+ uuid_generate(uuid);
- auto portId = inputPortsObj["id"].as<std::string>();
- auto nameStr = inputPortsObj["name"].as<std::string>();
- uuid_parse(portId.c_str(), uuid);
+ auto portId = inputPortsObj["id"].as<std::string>();
+ auto nameStr = inputPortsObj["name"].as<std::string>();
+ uuid_parse(portId.c_str(), uuid);
- port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+ port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
- processor = (Processor *) port;
- port->setDirection(direction);
- port->setTimeOut(parent->getTimeOut());
- port->setTransmitting(true);
- processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
- processor->initialize();
+ processor = (Processor *) port;
+ port->setDirection(direction);
+ port->setTimeOut(parent->getTimeOut());
+ port->setTransmitting(true);
+ processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+ processor->initialize();
- // handle port properties
- YAML::Node nodeVal = portNode->as<YAML::Node>();
- YAML::Node propertiesNode = nodeVal["Properties"];
+ // handle port properties
+ YAML::Node nodeVal = portNode->as<YAML::Node>();
+ YAML::Node propertiesNode = nodeVal["Properties"];
- parsePropertiesNodeYaml(&propertiesNode, processor);
+ parsePropertiesNodeYaml(&propertiesNode, processor);
- // add processor to parent
- parent->addProcessor(processor);
- processor->setScheduledState(RUNNING);
- auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
- int64_t maxConcurrentTasks;
- if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
- }
- _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ // add processor to parent
+ parent->addProcessor(processor);
+ processor->setScheduledState(RUNNING);
+ auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
+ int64_t maxConcurrentTasks;
+ if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+ _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) {
- char *id = NULL;
- char *name = NULL;
- uuid_t uuid;
- xmlNode *currentNode;
- Processor *processor = NULL;
- RemoteProcessorGroupPort *port = NULL;
-
- if (!parent) {
- _logger->log_error("parseProcessNode: no parent group existed");
- return;
- }
+ char *id = NULL;
+ char *name = NULL;
+ uuid_t uuid;
+ xmlNode *currentNode;
+ Processor *processor = NULL;
+ RemoteProcessorGroupPort *port = NULL;
+
+ if (!parent) {
+ _logger->log_error("parseProcessNode: no parent group existed");
+ return;
+ }
// generate the random UIID
- uuid_generate(uuid);
-
- for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
- if (currentNode->type == XML_ELEMENT_NODE) {
- if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
- id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseProcessorNode: id => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
- name = (char *) xmlNodeGetContent(currentNode);
- if (name) {
- _logger->log_debug("parseProcessorNode: name => [%s]", name);
- port = new RemoteProcessorGroupPort(name, uuid);
- processor = (Processor *) port;
- if (processor == NULL) {
- xmlFree(name);
- return;
- }
- port->setDirection(direction);
- port->setTimeOut(parent->getTimeOut());
- port->setTransmitting(parent->getTransmitting());
- processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
- processor->initialize();
- // add processor to parent
- parent->addProcessor(processor);
- xmlFree(name);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- std::string state = temp;
- if (state == "DISABLED") {
- _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
- processor->setScheduledState(DISABLED);
- }
- if (state == "STOPPED") {
- _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
- processor->setScheduledState(STOPPED);
- }
- if (state == "RUNNING") {
- _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
- processor->setScheduledState(RUNNING);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- int64_t maxConcurrentTasks;
- if (Property::StringToInt(temp, maxConcurrentTasks)) {
- _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
- this->parseProcessorProperty(doc, currentNode, processor);
- }
- } // if (currentNode->type == XML_ELEMENT_NODE)
- } // while node
+ uuid_generate(uuid);
+
+ for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+ if (currentNode->type == XML_ELEMENT_NODE) {
+ if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+ id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseProcessorNode: id => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+ name = (char *) xmlNodeGetContent(currentNode);
+ if (name) {
+ _logger->log_debug("parseProcessorNode: name => [%s]", name);
+ port = new RemoteProcessorGroupPort(name, uuid);
+ processor = (Processor *) port;
+ if (processor == NULL) {
+ xmlFree(name);
+ return;
+ }
+ port->setDirection(direction);
+ port->setTimeOut(parent->getTimeOut());
+ port->setTransmitting(parent->getTransmitting());
+ processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+ processor->initialize();
+ // add processor to parent
+ parent->addProcessor(processor);
+ xmlFree(name);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ std::string state = temp;
+ if (state == "DISABLED") {
+ _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
+ processor->setScheduledState(DISABLED);
+ }
+ if (state == "STOPPED") {
+ _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
+ processor->setScheduledState(STOPPED);
+ }
+ if (state == "RUNNING") {
+ _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
+ processor->setScheduledState(RUNNING);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ int64_t maxConcurrentTasks;
+ if (Property::StringToInt(temp, maxConcurrentTasks)) {
+ _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
+ this->parseProcessorProperty(doc, currentNode, processor);
+ }
+ } // if (currentNode->type == XML_ELEMENT_NODE)
+ } // while node
}
void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) {
- char *id = NULL;
- char *name = NULL;
- int64_t schedulingPeriod = -1;
- int64_t penalizationPeriod = -1;
- int64_t yieldPeriod = -1;
- bool lossTolerant = false;
- int64_t runDurationNanos = -1;
- uuid_t uuid;
- xmlNode *currentNode;
- Processor *processor = NULL;
-
- if (!parent) {
- _logger->log_error("parseProcessNode: no parent group existed");
- return;
- }
+ char *id = NULL;
+ char *name = NULL;
+ int64_t schedulingPeriod = -1;
+ int64_t penalizationPeriod = -1;
+ int64_t yieldPeriod = -1;
+ bool lossTolerant = false;
+ int64_t runDurationNanos = -1;
+ uuid_t uuid;
+ xmlNode *currentNode;
+ Processor *processor = NULL;
+
+ if (!parent) {
+ _logger->log_error("parseProcessNode: no parent group existed");
+ return;
+ }
// generate the random UIID
- uuid_generate(uuid);
-
- for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
- if (currentNode->type == XML_ELEMENT_NODE) {
- if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
- id = (char *) xmlNodeGetContent(currentNode);
- if (id) {
- _logger->log_debug("parseProcessorNode: id => [%s]", id);
- uuid_parse(id, uuid);
- xmlFree(id);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
- name = (char *) xmlNodeGetContent(currentNode);
- if (name) {
- _logger->log_debug("parseProcessorNode: name => [%s]", name);
- processor = this->createProcessor(name, uuid);
- if (processor == NULL) {
- xmlFree(name);
- return;
- }
- // add processor to parent
- parent->addProcessor(processor);
- xmlFree(name);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) {
- TimeUnit unit;
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToTime(temp, schedulingPeriod, unit)
- && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
- _logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) {
- TimeUnit unit;
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToTime(temp, penalizationPeriod, unit)
- && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
- _logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
- processor->setPenalizationPeriodMsec(penalizationPeriod);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
- TimeUnit unit;
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToTime(temp, yieldPeriod, unit)
- && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
- _logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
- processor->setYieldPeriodMsec(yieldPeriod);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToBool(temp, lossTolerant)) {
- _logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
- processor->setlossTolerant(lossTolerant);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- std::string state = temp;
- if (state == "DISABLED") {
- _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
- processor->setScheduledState(DISABLED);
- }
- if (state == "STOPPED") {
- _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
- processor->setScheduledState(STOPPED);
- }
- if (state == "RUNNING") {
- _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
- processor->setScheduledState(RUNNING);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- std::string strategy = temp;
- if (strategy == "TIMER_DRIVEN") {
- _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str());
- processor->setSchedulingStrategy(TIMER_DRIVEN);
- }
- if (strategy == "EVENT_DRIVEN") {
- _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str());
- processor->setSchedulingStrategy(EVENT_DRIVEN);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- int64_t maxConcurrentTasks;
- if (Property::StringToInt(temp, maxConcurrentTasks)) {
- _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- if (Property::StringToInt(temp, runDurationNanos)) {
- _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
- processor->setRunDurationNano(runDurationNanos);
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- if (temp) {
- std::string relationshipName = temp;
- Relationship relationship(relationshipName, "");
- std::set<Relationship> relationships;
-
- relationships.insert(relationship);
- processor->setAutoTerminatedRelationships(relationships);
- _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]",
- relationshipName.c_str());
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
- this->parseProcessorProperty(doc, currentNode, processor);
- }
- } // if (currentNode->type == XML_ELEMENT_NODE)
- } // while node
+ uuid_generate(uuid);
+
+ for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+ if (currentNode->type == XML_ELEMENT_NODE) {
+ if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+ id = (char *) xmlNodeGetContent(currentNode);
+ if (id) {
+ _logger->log_debug("parseProcessorNode: id => [%s]", id);
+ uuid_parse(id, uuid);
+ xmlFree(id);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+ name = (char *) xmlNodeGetContent(currentNode);
+ if (name) {
+ _logger->log_debug("parseProcessorNode: name => [%s]", name);
+ processor = this->createProcessor(name, uuid);
+ if (processor == NULL) {
+ xmlFree(name);
+ return;
+ }
+ // add processor to parent
+ parent->addProcessor(processor);
+ xmlFree(name);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) {
+ TimeUnit unit;
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToTime(temp, schedulingPeriod, unit)
+ && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ _logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
+ processor->setSchedulingPeriodNano(schedulingPeriod);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) {
+ TimeUnit unit;
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToTime(temp, penalizationPeriod, unit)
+ && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+ _logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
+ processor->setPenalizationPeriodMsec(penalizationPeriod);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
+ TimeUnit unit;
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToTime(temp, yieldPeriod, unit)
+ && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+ _logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
+ processor->setYieldPeriodMsec(yieldPeriod);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToBool(temp, lossTolerant)) {
+ _logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
+ processor->setlossTolerant(lossTolerant);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ std::string state = temp;
+ if (state == "DISABLED") {
+ _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
+ processor->setScheduledState(DISABLED);
+ }
+ if (state == "STOPPED") {
+ _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
+ processor->setScheduledState(STOPPED);
+ }
+ if (state == "RUNNING") {
+ _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
+ processor->setScheduledState(RUNNING);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ std::string strategy = temp;
+ if (strategy == "TIMER_DRIVEN") {
+ _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str());
+ processor->setSchedulingStrategy(TIMER_DRIVEN);
+ }
+ if (strategy == "EVENT_DRIVEN") {
+ _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str());
+ processor->setSchedulingStrategy(EVENT_DRIVEN);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ int64_t maxConcurrentTasks;
+ if (Property::StringToInt(temp, maxConcurrentTasks)) {
+ _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ if (Property::StringToInt(temp, runDurationNanos)) {
+ _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
+ processor->setRunDurationNano(runDurationNanos);
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp) {
+ std::string relationshipName = temp;
+ Relationship relationship(relationshipName, "");
+ std::set<Relationship> relationships;
+
+ relationships.insert(relationship);
+ processor->setAutoTerminatedRelationships(relationships);
+ _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]",
+ relationshipName.c_str());
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
+ this->parseProcessorProperty(doc, currentNode, processor);
+ }
+ } // if (currentNode->type == XML_ELEMENT_NODE)
+ } // while node
}
void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor)
@@ -1093,102 +1087,102 @@ void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Process
}
void FlowController::load(ConfigFormat configFormat) {
- if (_running) {
- stop(true);
- }
- if (!_initialized) {
- _logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
-
- if (ConfigFormat::XML == configFormat) {
- _logger->log_info("Detected an XML configuration file for processing.");
-
- xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
- if (doc == NULL) {
- _logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str());
- _initialized = true;
- return;
- }
-
- xmlNode *root = xmlDocGetRootElement(doc);
-
- if (root == NULL) {
- _logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str());
- xmlFreeDoc(doc);
- xmlCleanupParser();
- }
-
- if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) {
- _logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str());
- xmlFreeDoc(doc);
- xmlCleanupParser();
- return;
- }
-
- xmlNode *currentNode;
-
- for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
- if (currentNode->type == XML_ELEMENT_NODE) {
- if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) {
- this->parseRootProcessGroup(doc, currentNode);
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- int64_t maxTimerDrivenThreadCount;
- if (temp) {
- if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
- _logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount);
- this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
- }
- xmlFree(temp);
- }
- } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) {
- char *temp = (char *) xmlNodeGetContent(currentNode);
- int64_t maxEventDrivenThreadCount;
- if (temp) {
- if (Property::StringToInt(temp, maxEventDrivenThreadCount)) {
- _logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount);
- this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
- }
- xmlFree(temp);
- }
- }
- } // type == XML_ELEMENT_NODE
- } // for
-
- xmlFreeDoc(doc);
- xmlCleanupParser();
- _initialized = true;
- } else if (ConfigFormat::YAML == configFormat) {
- YAML::Node flow = YAML::LoadFile(_configurationFileName);
-
- YAML::Node flowControllerNode = flow["Flow Controller"];
- YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = flow["Connections"];
- YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
-
- // Create the root process group
- parseRootProcessGroupYaml(flowControllerNode);
- parseProcessorNodeYaml(processorsNode, this->_root);
- parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
- parseConnectionYaml(&connectionsNode, this->_root);
-
- _initialized = true;
- }
- }
+ if (_running) {
+ stop(true);
+ }
+ if (!_initialized) {
+ _logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
+
+ if (ConfigFormat::XML == configFormat) {
+ _logger->log_info("Detected an XML configuration file for processing.");
+
+ xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
+ if (doc == NULL) {
+ _logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str());
+ _initialized = true;
+ return;
+ }
+
+ xmlNode *root = xmlDocGetRootElement(doc);
+
+ if (root == NULL) {
+ _logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str());
+ xmlFreeDoc(doc);
+ xmlCleanupParser();
+ }
+
+ if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) {
+ _logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str());
+ xmlFreeDoc(doc);
+ xmlCleanupParser();
+ return;
+ }
+
+ xmlNode *currentNode;
+
+ for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+ if (currentNode->type == XML_ELEMENT_NODE) {
+ if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) {
+ this->parseRootProcessGroup(doc, currentNode);
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ int64_t maxTimerDrivenThreadCount;
+ if (temp) {
+ if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
+ _logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount);
+ this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
+ }
+ xmlFree(temp);
+ }
+ } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ int64_t maxEventDrivenThreadCount;
+ if (temp) {
+ if (Property::StringToInt(temp, maxEventDrivenThreadCount)) {
+ _logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount);
+ this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
+ }
+ xmlFree(temp);
+ }
+ }
+ } // type == XML_ELEMENT_NODE
+ } // for
+
+ xmlFreeDoc(doc);
+ xmlCleanupParser();
+ _initialized = true;
+ } else if (ConfigFormat::YAML == configFormat) {
+ YAML::Node flow = YAML::LoadFile(_configurationFileName);
+
+ YAML::Node flowControllerNode = flow["Flow Controller"];
+ YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
+ YAML::Node connectionsNode = flow["Connections"];
+ YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
+
+ // Create the root process group
+ parseRootProcessGroupYaml(flowControllerNode);
+ parseProcessorNodeYaml(processorsNode, this->_root);
+ parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
+ parseConnectionYaml(&connectionsNode, this->_root);
+
+ _initialized = true;
+ }
+ }
}
bool FlowController::start() {
- if (!_initialized) {
- _logger->log_error("Can not start Flow Controller because it has not been initialized");
- return false;
- } else {
- if (!_running) {
- _logger->log_info("Start Flow Controller");
- this->_timerScheduler.start();
- if (this->_root)
- this->_root->startProcessing(&this->_timerScheduler);
- _running = true;
- this->_protocol->start();
- }
- return true;
- }
+ if (!_initialized) {
+ _logger->log_error("Can not start Flow Controller because it has not been initialized");
+ return false;
+ } else {
+ if (!_running) {
+ _logger->log_info("Start Flow Controller");
+ this->_timerScheduler.start();
+ if (this->_root)
+ this->_root->startProcessing(&this->_timerScheduler);
+ _running = true;
+ this->_protocol->start();
+ }
+ return true;
+ }
}
[2/2] nifi-minifi-cpp git commit: MINIFI-98 Providing logging
information and exiting in case the specified configuration YAML cannot be
located. MINIFI-99 Providing default config.yml and updating README with
minor adjustments to reflect default build c
Posted by al...@apache.org.
MINIFI-98 Providing logging information and exiting in case the specified configuration YAML cannot be located.
MINIFI-99 Providing default config.yml and updating README with minor adjustments to reflect default build
configuration.
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/f14f2006
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/f14f2006
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/f14f2006
Branch: refs/heads/master
Commit: f14f200649077cc61ac4ac5e96562dd6a802eebb
Parents: c6b7ac0
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Nov 28 16:50:43 2016 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Nov 29 00:54:38 2016 -0500
----------------------------------------------------------------------
CMakeLists.txt | 3 +-
README.md | 140 ++-
conf/config.yml | 20 +
conf/minifi.properties | 2 +-
libminifi/src/FlowController.cpp | 1876 ++++++++++++++++-----------------
5 files changed, 1026 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 082e994..d99a8d9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -52,7 +52,6 @@ find_package(Threads REQUIRED)
# Provide custom modules for the project
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
-#file(GLOB SOURCES "libminifi/src/*.cpp")
add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
add_subdirectory(libminifi)
add_subdirectory(main)
@@ -64,7 +63,7 @@ set(CPACK_SOURCE_PACKAGE_FILE_NAME "${ASSEMBLY_BASE_NAME}-source")
set(CPACK_SOURCE_IGNORE_FILES "/build/;/.bzr/;~$;${CPACK_SOURCE_IGNORE_FILES}")
# Generate binary assembly
-install(FILES conf/minifi.properties
+install(FILES conf/minifi.properties conf/config.yml
DESTINATION conf
COMPONENT bin)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 388fde2..165a572 100644
--- a/README.md
+++ b/README.md
@@ -51,7 +51,8 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
* GenerateFlowFile
* LogAttribute
* ListenSyslog
-* Provenance events generation is supported and they are persistent using levelDB.
+ * ExecuteProcess
+* Provenance events generation is supported and are persisted using levelDB.
## System Requirements
@@ -69,7 +70,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
* libboost and boost-devel
* 1.48.0 or greater
* libxml2 and libxml2-devel
-* leveldb
+* leveldb and leveldb-devel
### To run
@@ -79,89 +80,86 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
## Getting Started
### Building
-- From your source checkout, create a directory to perform the build (e.g. build) and cd into that directory.
-
-
- # ~/Development/code/apache/nifi-minifi-cpp on git:master
- $ mkdir build
- # ~/Development/code/apache/nifi-minifi-cpp on git:master
- $ cd build
+- From your source checkout, create a directory to perform the build (e.g. build) and cd into that directory.
+ ```
+ # ~/Development/code/apache/nifi-minifi-cpp on git:master
+ $ mkdir build
+ # ~/Development/code/apache/nifi-minifi-cpp on git:master
+ $ cd build
+ ```
- Perform a `cmake ..` to generate the project files
-
-
- # ~/Development/code/apache/nifi-minifi-cpp on git:master
- $ cmake ..
- ...
- -- Configuring done
- -- Generating done
- -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build
-
+ ```
+ # ~/Development/code/apache/nifi-minifi-cpp on git:master
+ $ cmake ..
+ ...
+ -- Configuring done
+ -- Generating done
+ -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build
+ ```
- Perform a build
-
-
- # ~/Development/code/apache/nifi-minifi-cpp on git:master
- $ make
- Scanning dependencies of target gmock_main
- Scanning dependencies of target gmock
- Scanning dependencies of target minifi
- Scanning dependencies of target gtest
- Scanning dependencies of target yaml-cpp
- [ 1%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/gtest/CMakeFiles/gtest.dir/src/gtest-all.cc.o
- [ 3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/gtest/src/gtest-all.cc.o
- [ 3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/src/gmock-all.cc.o
- [ 6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/gtest/src/gtest-all.cc.o
- [ 6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/src/gmock-all.cc.o
- [ 7%] Building CXX object libminifi/CMakeFiles/minifi.dir/src/Configure.cpp.o
-
- ...
-
- [ 97%] Linking CXX executable minifi
- [ 97%] Built target minifiexe
- [ 98%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/CMakeFiles/run-tests.dir/node/node_test.cpp.o
- [100%] Linking CXX executable run-tests
- [100%] Built target run-tests
-
-
+ ```
+ # ~/Development/code/apache/nifi-minifi-cpp on git:master
+ $ make
+ Scanning dependencies of target gmock_main
+ Scanning dependencies of target gmock
+ Scanning dependencies of target minifi
+ Scanning dependencies of target gtest
+ Scanning dependencies of target yaml-cpp
+ [ 1%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/gtest/CMakeFiles/gtest.dir/src/gtest-all.cc.o
+ [ 3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/gtest/src/gtest-all.cc.o
+ [ 3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/src/gmock-all.cc.o
+ [ 6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/gtest/src/gtest-all.cc.o
+ [ 6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/src/gmock-all.cc.o
+ [ 7%] Building CXX object libminifi/CMakeFiles/minifi.dir/src/Configure.cpp.o
+
+ ...
+
+ [ 97%] Linking CXX executable minifi
+ [ 97%] Built target minifiexe
+ [ 98%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/CMakeFiles/run-tests.dir/node/node_test.cpp.o
+ [100%] Linking CXX executable run-tests
+ [100%] Built target run-tests
+ ```
- Create a binary assembly located in your build directory with suffix -bin.tar.gz
-
-
- ~/Development/code/apache/nifi-minifi-cpp/build
- $ make package
- Run CPack packaging tool for source...
- CPack: Create package using TGZ
- CPack: Install projects
- CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
- CPack: Create package
- CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-bin.tar.gz generated.
-
+ ```
+ ~/Development/code/apache/nifi-minifi-cpp/build
+ $ make package
+ Run CPack packaging tool for source...
+ CPack: Create package using TGZ
+ CPack: Install projects
+ CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
+ CPack: Create package
+ CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-bin.tar.gz generated.
+ ```
- Create a source assembly located in your build directory with suffix -source.tar.gz
-
-
- ~/Development/code/apache/nifi-minifi-cpp/build
- $ make package_source
- Run CPack packaging tool for source...
- CPack: Create package using TGZ
- CPack: Install projects
- CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
- CPack: Create package
- CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-source.tar.gz generated.
+ ```
+ ~/Development/code/apache/nifi-minifi-cpp/build
+ $ make package_source
+ Run CPack packaging tool for source...
+ CPack: Create package using TGZ
+ CPack: Install projects
+ CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
+ CPack: Create package
+ CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-source.tar.gz generated.
+ ```
### Cleaning
Remove the build directory created above.
-
- # ~/Development/code/apache/nifi-minifi-cpp on git:master
- $ rm -rf ./build
+```
+# ~/Development/code/apache/nifi-minifi-cpp on git:master
+$ rm -rf ./build
+```
### Configuring
-The 'conf' directory in the root contains a template flow.yml document.
+The 'conf' directory in the root contains a template config.yml document.
-This is compatible with the format used with the Java MiNiFi application. Currently, a subset of the configuration is supported. Additional information on the YAML format for the flow.yml can be found in the [MiNiFi System Administrator Guide](https://nifi.apache.org/minifi/system-admin-guide.html).
+This is compatible with the format used with the Java MiNiFi application. Currently, a subset of the configuration is supported. Additional information on the YAML format for the config.yml can be found in the [MiNiFi System Administrator Guide](https://nifi.apache.org/minifi/system-admin-guide.html).
Additionally, users can utilize the MiNiFi Toolkit Converter to aid in creating a flow configuration from a generated template exported from a NiFi instance. The MiNiFi Toolkit Converter tool can be downloaded from http://nifi.apache.org/minifi/download.html under the `MiNiFi Toolkit Binaries` section. Information on its usage is available at https://nifi.apache.org/minifi/minifi-toolkit.html.
@@ -206,11 +204,11 @@ Additionally, users can utilize the MiNiFi Toolkit Converter to aid in creating
Host Name: localhost
### Running
-After completing a [build](#building), the application can be run by issuing:
+After completing a [build](#building), the application can be run by issuing the following from :
$ ./bin/minifi.sh start
-By default, this will make use of a flow.yml located in the conf directory. This configuration file location can be altered by adjusting the property `nifi.flow.configuration.file` in minifi.properties located in the conf directory.
+By default, this will make use of a config.yml located in the conf directory. This configuration file location can be altered by adjusting the property `nifi.flow.configuration.file` in minifi.properties located in the conf directory.
### Stopping
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/conf/config.yml
----------------------------------------------------------------------
diff --git a/conf/config.yml b/conf/config.yml
new file mode 100644
index 0000000..b50c609
--- /dev/null
+++ b/conf/config.yml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+ name: MiNiFi Flow
+Processors: []
+Connections: []
+Remote Processing Groups: []
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index f9b2d9f..da46653 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -15,7 +15,7 @@
# Core Properties #
nifi.version=0.1.0
-nifi.flow.configuration.file=./conf/flow.yml
+nifi.flow.configuration.file=./conf/config.yml
nifi.administrative.yield.duration=30 sec
# If a component has no work to do (is "bored"), how long should we wait before checking again for work?
nifi.bored.yield.duration=10 millis