You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/11/29 05:55:37 UTC

[1/2] nifi-minifi-cpp git commit: MINIFI-98 Providing logging information and exiting in case the specified configuration YAML cannot be located. MINIFI-99 Providing default config.yml and updating README with minor adjustments to reflect default build c

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master c6b7ac0b0 -> f14f20064


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index ab1fcf7..3598716 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -35,206 +35,207 @@ FlowController *FlowController::_flowController(NULL);
 FlowController::FlowController(std::string name)
 : _name(name)
 {
-	uuid_generate(_uuid);
+    uuid_generate(_uuid);
 
-	// Setup the default values
-	_configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
-	_maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
-	_maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
-	_running = false;
-	_initialized = false;
-	_root = NULL;
-	_logger = Logger::getLogger();
-	_protocol = new FlowControlProtocol(this);
+    // Setup the default values
+    _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
+    _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+    _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+    _running = false;
+    _initialized = false;
+    _root = NULL;
+    _logger = Logger::getLogger();
+    _protocol = new FlowControlProtocol(this);
 
-	// NiFi config properties
-	_configure = Configure::getConfigure();
+    // NiFi config properties
+    _configure = Configure::getConfigure();
 
-	std::string rawConfigFileString;
-	_configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
+    std::string rawConfigFileString;
+    _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
 
 	if (!rawConfigFileString.empty())
 	{
-		_configurationFileName = rawConfigFileString;
-	}
+        _configurationFileName = rawConfigFileString;
+    }
 
-	char *path = NULL;
-	char full_path[PATH_MAX];
+    char *path = NULL;
+    char full_path[PATH_MAX];
 
-	std::string adjustedFilename;
+    std::string adjustedFilename;
 	if (!_configurationFileName.empty())
 	{
-		// perform a naive determination if this is a relative path
+        // perform a naive determination if this is a relative path
 		if (_configurationFileName.c_str()[0] != '/')
 		{
-			adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName;
+            adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName;
 		}
 		else
 		{
-			adjustedFilename = _configurationFileName;
-		}
-	}
+            adjustedFilename = _configurationFileName;
+        }
+    }
 
-	path = realpath(adjustedFilename.c_str(), full_path);
+    path = realpath(adjustedFilename.c_str(), full_path);
 	if (!path)
 	{
-		_logger->log_error("Could not locate path from provided configuration file name.");
-	}
+        _logger->log_error("Could not locate path from provided configuration file name (%s).  Exiting.", full_path);
+        exit(1);
+    }
 
-	std::string pathString(path);
-	_configurationFileName = pathString;
-	_logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
+    std::string pathString(path);
+    _configurationFileName = pathString;
+    _logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
 
-	// Create repos for flow record and provenance
-	_provenanceRepo = new ProvenanceRepository();
-	_provenanceRepo->initialize();
+    // Create repos for flow record and provenance
+    _provenanceRepo = new ProvenanceRepository();
+    _provenanceRepo->initialize();
 
-	_logger->log_info("FlowController %s created", _name.c_str());
+    _logger->log_info("FlowController %s created", _name.c_str());
 }
 
 FlowController::~FlowController()
 {
-	stop(true);
-	unload();
-	delete _protocol;
-	delete _provenanceRepo;
+    stop(true);
+    unload();
+    delete _protocol;
+    delete _provenanceRepo;
 }
 
 bool FlowController::isRunning()
 {
-	return (_running);
+    return (_running);
 }
 
 bool FlowController::isInitialized()
 {
-	return (_initialized);
+    return (_initialized);
 }
 
 void FlowController::stop(bool force)
 {
 	if (_running)
 	{
-		_logger->log_info("Stop Flow Controller");
-		this->_timerScheduler.stop();
-		// Wait for sometime for thread stop
-		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-		if (this->_root)
-			this->_root->stopProcessing(&this->_timerScheduler);
-		_running = false;
-	}
+        _logger->log_info("Stop Flow Controller");
+        this->_timerScheduler.stop();
+        // Wait for sometime for thread stop
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+        if (this->_root)
+            this->_root->stopProcessing(&this->_timerScheduler);
+        _running = false;
+    }
 }
 
 void FlowController::unload()
 {
 	if (_running)
 	{
-		stop(true);
-	}
+        stop(true);
+    }
 	if (_initialized)
 	{
-		_logger->log_info("Unload Flow Controller");
-		if (_root)
-			delete _root;
-		_root = NULL;
-		_initialized = false;
-		_name = "";
-	}
+        _logger->log_info("Unload Flow Controller");
+        if (_root)
+            delete _root;
+        _root = NULL;
+        _initialized = false;
+        _name = "";
+    }
 
-	return;
+    return;
 }
 
 void FlowController::reload(std::string xmlFile)
 {
-	_logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
-	stop(true);
-	unload();
-	std::string oldxmlFile = this->_configurationFileName;
-	this->_configurationFileName = xmlFile;
-	load(ConfigFormat::XML);
-	start();
+    _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
+    stop(true);
+    unload();
+    std::string oldxmlFile = this->_configurationFileName;
+    this->_configurationFileName = xmlFile;
+    load(ConfigFormat::XML);
+    start();
 	if (!this->_root)
 	{
-		this->_configurationFileName = oldxmlFile;
-		_logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
-		stop(true);
-		unload();
-		load(ConfigFormat::XML);
-		start();
-	}
+        this->_configurationFileName = oldxmlFile;
+        _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
+        stop(true);
+        unload();
+        load(ConfigFormat::XML);
+        start();
+    }
 }
 
 Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
 {
-	Processor *processor = NULL;
+    Processor *processor = NULL;
 	if (name == GenerateFlowFile::ProcessorName)
 	{
-		processor = new GenerateFlowFile(name, uuid);
+        processor = new GenerateFlowFile(name, uuid);
 	}
 	else if (name == LogAttribute::ProcessorName)
 	{
-		processor = new LogAttribute(name, uuid);
+        processor = new LogAttribute(name, uuid);
 	}
 	else if (name == RealTimeDataCollector::ProcessorName)
 	{
-		processor = new RealTimeDataCollector(name, uuid);
+        processor = new RealTimeDataCollector(name, uuid);
 	}
 	else if (name == GetFile::ProcessorName)
 	{
-		processor = new GetFile(name, uuid);
+        processor = new GetFile(name, uuid);
 	}
 	else if (name == TailFile::ProcessorName)
 	{
-		processor = new TailFile(name, uuid);
+        processor = new TailFile(name, uuid);
 	}
 	else if (name == ListenSyslog::ProcessorName)
 	{
-		processor = new ListenSyslog(name, uuid);
+        processor = new ListenSyslog(name, uuid);
 	}
 	else if (name == ExecuteProcess::ProcessorName)
 	{
-		processor = new ExecuteProcess(name, uuid);
+        processor = new ExecuteProcess(name, uuid);
 	}
 	else
 	{
-		_logger->log_error("No Processor defined for %s", name.c_str());
-		return NULL;
-	}
+        _logger->log_error("No Processor defined for %s", name.c_str());
+        return NULL;
+    }
 
-	//! initialize the processor
-	processor->initialize();
+    //! initialize the processor
+    processor->initialize();
 
-	return processor;
+    return processor;
 }
 
 ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid)
 {
-	return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
+    return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
 }
 
 ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid)
 {
-	return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
+    return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
 }
 
 Connection *FlowController::createConnection(std::string name, uuid_t uuid)
 {
-	return new Connection(name, uuid);
+    return new Connection(name, uuid);
 }
 
 void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent)
 {
-	uuid_t uuid;
-	xmlNode *currentNode;
-	Connection *connection = NULL;
+    uuid_t uuid;
+    xmlNode *currentNode;
+    Connection *connection = NULL;
 
 	if (!parent)
 	{
-		_logger->log_error("parseProcessNode: no parent group existed");
-		return;
-	}
+        _logger->log_error("parseProcessNode: no parent group existed");
+        return;
+    }
 
-	// generate the random UIID
-	uuid_generate(uuid);
+    // generate the random UIID
+    uuid_generate(uuid);
 
 	for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next)
 	{
@@ -242,836 +243,829 @@ void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *p
 		{
 			if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
 			{
-				char *id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseConnection: id => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
-				char *name = (char *) xmlNodeGetContent(currentNode);
-				if (name) {
-					_logger->log_debug("parseConnection: name => [%s]", name);
-					connection = this->createConnection(name, uuid);
-					if (connection == NULL) {
-						xmlFree(name);
-						return;
-					}
-					xmlFree(name);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) {
-				char *id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseConnection: sourceId => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-					if (connection)
-						connection->setSourceProcessorUUID(uuid);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) {
-				char *id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseConnection: destinationId => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-					if (connection)
-						connection->setDestinationProcessorUUID(uuid);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				int64_t maxWorkQueueSize = 0;
-				if (temp) {
-					if (Property::StringToInt(temp, maxWorkQueueSize)) {
-						_logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize);
-						if (connection)
-							connection->setMaxQueueSize(maxWorkQueueSize);
-
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				int64_t maxWorkQueueDataSize = 0;
-				if (temp) {
-					if (Property::StringToInt(temp, maxWorkQueueDataSize)) {
-						_logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize);
-						if (connection)
-							connection->setMaxQueueDataSize(maxWorkQueueDataSize);
-
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					std::string relationshipName = temp;
-					if (!relationshipName.empty()) {
-						Relationship relationship(relationshipName, "");
-						_logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str());
-						if (connection)
-							connection->setRelationship(relationship);
-					} else {
-						Relationship empty;
-						_logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str());
-						if (connection)
-							connection->setRelationship(empty);
-					}
-					xmlFree(temp);
-				}
-			}
-		} // if (currentNode->type == XML_ELEMENT_NODE)
-	} // for node
-
-	if (connection)
-		parent->addConnection(connection);
-
-	return;
+                char *id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseConnection: id => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+                char *name = (char *) xmlNodeGetContent(currentNode);
+                if (name) {
+                    _logger->log_debug("parseConnection: name => [%s]", name);
+                    connection = this->createConnection(name, uuid);
+                    if (connection == NULL) {
+                        xmlFree(name);
+                        return;
+                    }
+                    xmlFree(name);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) {
+                char *id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseConnection: sourceId => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                    if (connection)
+                        connection->setSourceProcessorUUID(uuid);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) {
+                char *id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseConnection: destinationId => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                    if (connection)
+                        connection->setDestinationProcessorUUID(uuid);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                int64_t maxWorkQueueSize = 0;
+                if (temp) {
+                    if (Property::StringToInt(temp, maxWorkQueueSize)) {
+                        _logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize);
+                        if (connection)
+                            connection->setMaxQueueSize(maxWorkQueueSize);
+
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                int64_t maxWorkQueueDataSize = 0;
+                if (temp) {
+                    if (Property::StringToInt(temp, maxWorkQueueDataSize)) {
+                        _logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize);
+                        if (connection)
+                            connection->setMaxQueueDataSize(maxWorkQueueDataSize);
+
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    std::string relationshipName = temp;
+                    if (!relationshipName.empty()) {
+                        Relationship relationship(relationshipName, "");
+                        _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str());
+                        if (connection)
+                            connection->setRelationship(relationship);
+                    } else {
+                        Relationship empty;
+                        _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str());
+                        if (connection)
+                            connection->setRelationship(empty);
+                    }
+                    xmlFree(temp);
+                }
+            }
+        } // if (currentNode->type == XML_ELEMENT_NODE)
+    } // for node
+
+    if (connection)
+        parent->addConnection(connection);
+
+    return;
 }
 
 void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) {
-	uuid_t uuid;
-	xmlNode *currentNode;
-	ProcessGroup *group = NULL;
-
-	// generate the random UIID
-	uuid_generate(uuid);
-
-	for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
-		if (currentNode->type == XML_ELEMENT_NODE) {
-			if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-				char *id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
-				char *name = (char *) xmlNodeGetContent(currentNode);
-				if (name) {
-					_logger->log_debug("parseRootProcessGroup: name => [%s]", name);
-					group = this->createRootProcessGroup(name, uuid);
-					if (group == NULL) {
-						xmlFree(name);
-						return;
-					}
-					// Set the root process group
-					this->_root = group;
-					this->_name = name;
-					xmlFree(name);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) {
-				this->parseProcessorNode(doc, currentNode, group);
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) {
-				this->parseConnection(doc, currentNode, group);
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) {
-				this->parseRemoteProcessGroup(doc, currentNode, group);
-			}
-		} // if (currentNode->type == XML_ELEMENT_NODE)
-	} // for node
+    uuid_t uuid;
+    xmlNode *currentNode;
+    ProcessGroup *group = NULL;
+
+    // generate the random UIID
+    uuid_generate(uuid);
+
+    for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+        if (currentNode->type == XML_ELEMENT_NODE) {
+            if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                char *id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+                char *name = (char *) xmlNodeGetContent(currentNode);
+                if (name) {
+                    _logger->log_debug("parseRootProcessGroup: name => [%s]", name);
+                    group = this->createRootProcessGroup(name, uuid);
+                    if (group == NULL) {
+                        xmlFree(name);
+                        return;
+                    }
+                    // Set the root process group
+                    this->_root = group;
+                    this->_name = name;
+                    xmlFree(name);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) {
+                this->parseProcessorNode(doc, currentNode, group);
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) {
+                this->parseConnection(doc, currentNode, group);
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) {
+                this->parseRemoteProcessGroup(doc, currentNode, group);
+            }
+        } // if (currentNode->type == XML_ELEMENT_NODE)
+    } // for node
 }
 
 void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
-	uuid_t uuid;
-	ProcessGroup *group = NULL;
+    uuid_t uuid;
+    ProcessGroup *group = NULL;
 
-	// generate the random UIID
-	uuid_generate(uuid);
+    // generate the random UIID
+    uuid_generate(uuid);
 
-	std::string flowName = rootFlowNode["name"].as<std::string>();
+    std::string flowName = rootFlowNode["name"].as<std::string>();
 
-	char uuidStr[37];
-	uuid_unparse(_uuid, uuidStr);
-	_logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
-	_logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
-	group = this->createRootProcessGroup(flowName, uuid);
-	this->_root = group;
-	this->_name = flowName;
+    char uuidStr[37];
+    uuid_unparse(_uuid, uuidStr);
+    _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
+    _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
+    group = this->createRootProcessGroup(flowName, uuid);
+    this->_root = group;
+    this->_name = flowName;
 }
 
 void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) {
-	int64_t schedulingPeriod = -1;
-	int64_t penalizationPeriod = -1;
-	int64_t yieldPeriod = -1;
-	int64_t runDurationNanos = -1;
-	uuid_t uuid;
-	Processor *processor = NULL;
-
-	if (!parentGroup) {
-		_logger->log_error("parseProcessNodeYaml: no parent group exists");
-		return;
-	}
+    int64_t schedulingPeriod = -1;
+    int64_t penalizationPeriod = -1;
+    int64_t yieldPeriod = -1;
+    int64_t runDurationNanos = -1;
+    uuid_t uuid;
+    Processor *processor = NULL;
+
+    if (!parentGroup) {
+        _logger->log_error("parseProcessNodeYaml: no parent group exists");
+        return;
+    }
 
-	if (processorsNode) {
-		// Evaluate sequence of processors
-		int numProcessors = processorsNode.size();
-		if (numProcessors < 1) {
-			throw new std::invalid_argument("There must be at least one processor configured.");
-		}
+    if (processorsNode) {
 
-		std::vector<ProcessorConfig> processorConfigs;
+        if (processorsNode.IsSequence()) {
+            // Evaluate sequence of processors
+            int numProcessors = processorsNode.size();
 
-		if (processorsNode.IsSequence()) {
-			for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
-				ProcessorConfig procCfg;
-				YAML::Node procNode = iter->as<YAML::Node>();
 
-				procCfg.name = procNode["name"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str());
-				procCfg.javaClass = procNode["class"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str());
+            for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+                ProcessorConfig procCfg;
+                YAML::Node procNode = iter->as<YAML::Node>();
 
-				char uuidStr[37];
-				uuid_unparse(_uuid, uuidStr);
+                procCfg.name = procNode["name"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str());
+                procCfg.javaClass = procNode["class"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str());
 
-				// generate the random UUID
-				uuid_generate(uuid);
+                char uuidStr[37];
+                uuid_unparse(_uuid, uuidStr);
 
-				// Determine the processor name only from the Java class
-				int lastOfIdx = procCfg.javaClass.find_last_of(".");
-				if (lastOfIdx != std::string::npos) {
-					lastOfIdx++; // if a value is found, increment to move beyond the .
-					int nameLength = procCfg.javaClass.length() - lastOfIdx;
-					std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
-					processor = this->createProcessor(processorName, uuid);
-				}
+                // generate the random UUID
+                uuid_generate(uuid);
 
-				if (!processor) {
-					_logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr);
-					throw std::invalid_argument("Could not create processor " + procCfg.name);
-				}
-				processor->setName(procCfg.name);
+                // Determine the processor name only from the Java class
+                int lastOfIdx = procCfg.javaClass.find_last_of(".");
+                if (lastOfIdx != std::string::npos) {
+                    lastOfIdx++; // if a value is found, increment to move beyond the .
+                    int nameLength = procCfg.javaClass.length() - lastOfIdx;
+                    std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
+                    processor = this->createProcessor(processorName, uuid);
+                }
 
-				procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+                if (!processor) {
+                    _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr);
+                    throw std::invalid_argument("Could not create processor " + procCfg.name);
+                }
+                processor->setName(procCfg.name);
+
+                procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
 				_logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str());
-				procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: scheduling strategy => [%s]",
-						procCfg.schedulingStrategy.c_str());
-				procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
-				procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: penalization period => [%s]",
-						procCfg.penalizationPeriod.c_str());
-				procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str());
-				procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
-				_logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str());
-
-				// handle auto-terminated relationships
-				YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
-				std::vector<std::string> rawAutoTerminatedRelationshipValues;
-				if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
-						&& autoTerminatedSequence.size() > 0) {
-					for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
-							relIter != autoTerminatedSequence.end(); ++relIter) {
-						std::string autoTerminatedRel = relIter->as<std::string>();
-						rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
-					}
-				}
-				procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
-
-				// handle processor properties
-				YAML::Node propertiesNode = procNode["Properties"];
-				parsePropertiesNodeYaml(&propertiesNode, processor);
-
-				// Take care of scheduling
-				TimeUnit unit;
-				if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
-						&& Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-					_logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
-					processor->setSchedulingPeriodNano(schedulingPeriod);
-				}
-
-				if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
-						&& Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
-					_logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms",
-							penalizationPeriod);
-					processor->setPenalizationPeriodMsec(penalizationPeriod);
-				}
-
-				if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
-						&& Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
-					_logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
-					processor->setYieldPeriodMsec(yieldPeriod);
-				}
-
-				// Default to running
-				processor->setScheduledState(RUNNING);
-
-				if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
-					processor->setSchedulingStrategy(TIMER_DRIVEN);
-					_logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
-				} else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-					processor->setSchedulingStrategy(EVENT_DRIVEN);
-					_logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
-				} else {
-					processor->setSchedulingStrategy(CRON_DRIVEN);
-					_logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
-
-				}
-
-				int64_t maxConcurrentTasks;
-				if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
-					_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-					processor->setMaxConcurrentTasks(maxConcurrentTasks);
-				}
-
-				if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
-					_logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
-					processor->setRunDurationNano(runDurationNanos);
-				}
-
-				std::set<Relationship> autoTerminatedRelationships;
-				for (auto&& relString : procCfg.autoTerminatedRelationships) {
-					Relationship relationship(relString, "");
-					_logger->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]", relString.c_str());
-					autoTerminatedRelationships.insert(relationship);
-				}
-
-				processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-				parentGroup->addProcessor(processor);
-			}
-		}
-	} else {
-		throw new std::invalid_argument(
-				"Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
-	}
+                procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]",
+                                   procCfg.schedulingStrategy.c_str());
+                procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
+                procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: penalization period => [%s]",
+                                   procCfg.penalizationPeriod.c_str());
+                procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str());
+                procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+                _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str());
+
+                // handle auto-terminated relationships
+                YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+                std::vector<std::string> rawAutoTerminatedRelationshipValues;
+                if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
+                    && autoTerminatedSequence.size() > 0) {
+                    for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
+                         relIter != autoTerminatedSequence.end(); ++relIter) {
+                        std::string autoTerminatedRel = relIter->as<std::string>();
+                        rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+                    }
+                }
+                procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
+
+                // handle processor properties
+                YAML::Node propertiesNode = procNode["Properties"];
+                parsePropertiesNodeYaml(&propertiesNode, processor);
+
+                // Take care of scheduling
+                TimeUnit unit;
+                if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
+                    && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+                    _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
+                    processor->setSchedulingPeriodNano(schedulingPeriod);
+                }
+
+                if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
+                    && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+                    _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms",
+                                       penalizationPeriod);
+                    processor->setPenalizationPeriodMsec(penalizationPeriod);
+                }
+
+                if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
+                    && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+                    _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
+                    processor->setYieldPeriodMsec(yieldPeriod);
+                }
+
+                // Default to running
+                processor->setScheduledState(RUNNING);
+
+                if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
+                    processor->setSchedulingStrategy(TIMER_DRIVEN);
+                    _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
+                } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+                    processor->setSchedulingStrategy(EVENT_DRIVEN);
+                    _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
+                } else {
+                    processor->setSchedulingStrategy(CRON_DRIVEN);
+                    _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
+
+                }
+
+                int64_t maxConcurrentTasks;
+                if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+                    _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+                    processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                }
+
+                if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+                    _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
+                    processor->setRunDurationNano(runDurationNanos);
+                }
+
+                std::set<Relationship> autoTerminatedRelationships;
+                for (auto &&relString : procCfg.autoTerminatedRelationships) {
+                    Relationship relationship(relString, "");
+                    _logger->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]", relString.c_str());
+                    autoTerminatedRelationships.insert(relationship);
+                }
+
+                processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+                parentGroup->addProcessor(processor);
+            }
+        }
+    } else {
+        throw new std::invalid_argument(
+                "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+    }
 }
 
 void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) {
-	uuid_t uuid;
+    uuid_t uuid;
 
-	if (!parentGroup) {
-		_logger->log_error("parseRemoteProcessGroupYaml: no parent group exists");
-		return;
-	}
+    if (!parentGroup) {
+        _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists");
+        return;
+    }
 
-	if (rpgNode) {
-		if (rpgNode->IsSequence()) {
-			for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
-				YAML::Node rpgNode = iter->as<YAML::Node>();
+    if (rpgNode) {
+        if (rpgNode->IsSequence()) {
+            for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
+                YAML::Node rpgNode = iter->as<YAML::Node>();
 
-				auto name = rpgNode["name"].as<std::string>();
-				_logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
+                auto name = rpgNode["name"].as<std::string>();
+                _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
 
-				std::string url = rpgNode["url"].as<std::string>();
-				_logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
+                std::string url = rpgNode["url"].as<std::string>();
+                _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
 
-				std::string timeout = rpgNode["timeout"].as<std::string>();
-				_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
+                std::string timeout = rpgNode["timeout"].as<std::string>();
+                _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
 
-				std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
-				_logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str());
+                std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
+                _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str());
 
-				YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
-				ProcessGroup* group = NULL;
+                YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
+                ProcessGroup *group = NULL;
 
-				// generate the random UUID
-				uuid_generate(uuid);
+                // generate the random UUID
+                uuid_generate(uuid);
 
-				char uuidStr[37];
-				uuid_unparse(_uuid, uuidStr);
+                char uuidStr[37];
+                uuid_unparse(_uuid, uuidStr);
 
-				int64_t timeoutValue = -1;
-				int64_t yieldPeriodValue = -1;
+                int64_t timeoutValue = -1;
+                int64_t yieldPeriodValue = -1;
 
-				group = this->createRemoteProcessGroup(name.c_str(), uuid);
-				group->setParent(parentGroup);
-				parentGroup->addProcessGroup(group);
+                group = this->createRemoteProcessGroup(name.c_str(), uuid);
+                group->setParent(parentGroup);
+                parentGroup->addProcessGroup(group);
 
-				TimeUnit unit;
+                TimeUnit unit;
 
-				if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
-							&& Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
-					_logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
-					group->setYieldPeriodMsec(yieldPeriodValue);
-				}
+                if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+                    && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+                    _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
+                    group->setYieldPeriodMsec(yieldPeriodValue);
+                }
 
-				if (Property::StringToTime(timeout, timeoutValue, unit)
-					&& Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
-					_logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
-					group->setTimeOut(timeoutValue);
-				}
+                if (Property::StringToTime(timeout, timeoutValue, unit)
+                    && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+                    _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
+                    group->setTimeOut(timeoutValue);
+                }
 
-				group->setTransmitting(true);
-				group->setURL(url);
+                group->setTransmitting(true);
+                group->setURL(url);
 
-				if (inputPorts.IsSequence()) {
-					for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
-						_logger->log_debug("Got a current port, iterating...");
+                if (inputPorts.IsSequence()) {
+                    for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+                        _logger->log_debug("Got a current port, iterating...");
 
-						YAML::Node currPort = portIter->as<YAML::Node>();
+                        YAML::Node currPort = portIter->as<YAML::Node>();
 
-						this->parsePortYaml(&currPort, group, SEND);
-					} // for node
-				}
-			}
-		}
-	}
+                        this->parsePortYaml(&currPort, group, SEND);
+                    } // for node
+                }
+            }
+        }
+    }
 }
 
 void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) {
-	uuid_t uuid;
-	Connection *connection = NULL;
-
-	if (!parent) {
-		_logger->log_error("parseProcessNode: no parent group was provided");
-		return;
-	}
+    uuid_t uuid;
+    Connection *connection = NULL;
 
-	if (connectionsNode) {
-		int numConnections = connectionsNode->size();
-		if (numConnections < 1) {
-			throw new std::invalid_argument("There must be at least one connection configured.");
-		}
+    if (!parent) {
+        _logger->log_error("parseProcessNode: no parent group was provided");
+        return;
+    }
 
-		if (connectionsNode->IsSequence()) {
-			for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
-				// generate the random UIID
-				uuid_generate(uuid);
-
-				YAML::Node connectionNode = iter->as<YAML::Node>();
-
-				std::string name = connectionNode["name"].as<std::string>();
-				std::string destName = connectionNode["destination name"].as<std::string>();
-
-				char uuidStr[37];
-				uuid_unparse(_uuid, uuidStr);
-
-				_logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str());
-				connection = this->createConnection(name, uuid);
-				auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
-				Relationship relationship(rawRelationship, "");
-				_logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str());
-				if (connection)
-					connection->setRelationship(relationship);
-				std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
-
-				Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName);
-
-				if (!srcProcessor) {
-					_logger->log_error("Could not locate a source with name %s to create a connection",
-							connectionSrcProcName.c_str());
-					throw std::invalid_argument(
-							"Could not locate a source with name %s to create a connection " + connectionSrcProcName);
-				}
-
-				Processor *destProcessor = this->_root->findProcessor(destName);
-				// If we could not find name, try by UUID
-				if (!destProcessor) {
-					uuid_t destUuid;
-					uuid_parse(destName.c_str(), destUuid);
-					destProcessor = this->_root->findProcessor(destUuid);
-				}
-				if (destProcessor) {
-					std::string destUuid = destProcessor->getUUIDStr();
-				}
-
-				uuid_t srcUuid;
-				uuid_t destUuid;
-				srcProcessor->getUUID(srcUuid);
-				connection->setSourceProcessorUUID(srcUuid);
-				destProcessor->getUUID(destUuid);
-				connection->setDestinationProcessorUUID(destUuid);
-
-				if (connection) {
-					parent->addConnection(connection);
-				}
-			}
-		}
+    if (connectionsNode) {
+
+        if (connectionsNode->IsSequence()) {
+            for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+                // generate the random UUID
+                uuid_generate(uuid);
+
+                YAML::Node connectionNode = iter->as<YAML::Node>();
+
+                std::string name = connectionNode["name"].as<std::string>();
+                std::string destName = connectionNode["destination name"].as<std::string>();
+
+                char uuidStr[37];
+                uuid_unparse(_uuid, uuidStr);
+
+                _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str());
+                connection = this->createConnection(name, uuid);
+                auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
+                Relationship relationship(rawRelationship, "");
+                _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str());
+                if (connection)
+                    connection->setRelationship(relationship);
+                std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
+
+                Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName);
+
+                if (!srcProcessor) {
+                    _logger->log_error("Could not locate a source with name %s to create a connection",
+                                       connectionSrcProcName.c_str());
+                    throw std::invalid_argument(
+                            "Could not locate a source with name %s to create a connection " + connectionSrcProcName);
+                }
+
+                Processor *destProcessor = this->_root->findProcessor(destName);
+                // If we could not find name, try by UUID
+                if (!destProcessor) {
+                    uuid_t destUuid;
+                    uuid_parse(destName.c_str(), destUuid);
+                    destProcessor = this->_root->findProcessor(destUuid);
+                }
+                if (destProcessor) {
+                    std::string destUuid = destProcessor->getUUIDStr();
+                }
+
+                uuid_t srcUuid;
+                uuid_t destUuid;
+                srcProcessor->getUUID(srcUuid);
+                connection->setSourceProcessorUUID(srcUuid);
+                destProcessor->getUUID(destUuid);
+                connection->setDestinationProcessorUUID(destUuid);
+
+                if (connection) {
+                    parent->addConnection(connection);
+                }
+            }
+        }
 
-		if (connection)
-			parent->addConnection(connection);
+        if (connection)
+            parent->addConnection(connection);
 
-		return;
-	}
+        return;
+    }
 }
 
 void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) {
-	uuid_t uuid;
-	xmlNode *currentNode;
-	ProcessGroup *group = NULL;
-	int64_t yieldPeriod = -1;
-	int64_t timeOut = -1;
+    uuid_t uuid;
+    xmlNode *currentNode;
+    ProcessGroup *group = NULL;
+    int64_t yieldPeriod = -1;
+    int64_t timeOut = -1;
 
 // generate the random UIID
-	uuid_generate(uuid);
-
-	for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
-		if (currentNode->type == XML_ELEMENT_NODE) {
-			if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-				char *id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
-				char *name = (char *) xmlNodeGetContent(currentNode);
-				if (name) {
-					_logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
-					group = this->createRemoteProcessGroup(name, uuid);
-					if (group == NULL) {
-						xmlFree(name);
-						return;
-					}
-					group->setParent(parent);
-					parent->addProcessGroup(group);
-					xmlFree(name);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
-				TimeUnit unit;
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToTime(temp, yieldPeriod, unit)
-							&& Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
-						_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod);
-						group->setYieldPeriodMsec(yieldPeriod);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) {
-				TimeUnit unit;
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToTime(temp, timeOut, unit)
-							&& Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
-						_logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
-						group->setTimeOut(timeOut);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				bool transmitting;
-				if (temp) {
-					if (Property::StringToBool(temp, transmitting) && group) {
-						_logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting);
-						group->setTransmitting(transmitting);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) {
-				this->parsePort(doc, currentNode, group, SEND);
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) {
-				this->parsePort(doc, currentNode, group, RECEIVE);
-			}
-		} // if (currentNode->type == XML_ELEMENT_NODE)
-	} // for node
+    uuid_generate(uuid);
+
+    for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+        if (currentNode->type == XML_ELEMENT_NODE) {
+            if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                char *id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+                char *name = (char *) xmlNodeGetContent(currentNode);
+                if (name) {
+                    _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
+                    group = this->createRemoteProcessGroup(name, uuid);
+                    if (group == NULL) {
+                        xmlFree(name);
+                        return;
+                    }
+                    group->setParent(parent);
+                    parent->addProcessGroup(group);
+                    xmlFree(name);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
+                TimeUnit unit;
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToTime(temp, yieldPeriod, unit)
+                        && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
+                        _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod);
+                        group->setYieldPeriodMsec(yieldPeriod);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) {
+                TimeUnit unit;
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToTime(temp, timeOut, unit)
+                        && Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
+                        _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
+                        group->setTimeOut(timeOut);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                bool transmitting;
+                if (temp) {
+                    if (Property::StringToBool(temp, transmitting) && group) {
+                        _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting);
+                        group->setTransmitting(transmitting);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) {
+                this->parsePort(doc, currentNode, group, SEND);
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) {
+                this->parsePort(doc, currentNode, group, RECEIVE);
+            }
+        } // if (currentNode->type == XML_ELEMENT_NODE)
+    } // for node
 }
 
 void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) {
-	xmlNode *currentNode;
-	std::string propertyValue;
-	std::string propertyName;
+    xmlNode *currentNode;
+    std::string propertyValue;
+    std::string propertyName;
 
-	if (!processor) {
-		_logger->log_error("parseProcessorProperty: no parent processor existed");
-		return;
-	}
+    if (!processor) {
+        _logger->log_error("parseProcessorProperty: no parent processor existed");
+        return;
+    }
 
-	for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
-		if (currentNode->type == XML_ELEMENT_NODE) {
-			if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
-				char *name = (char *) xmlNodeGetContent(currentNode);
-				if (name) {
-					_logger->log_debug("parseProcessorNode: name => [%s]", name);
-					propertyName = name;
-					xmlFree(name);
-				}
-			}
-			if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) {
-				char *value = (char *) xmlNodeGetContent(currentNode);
-				if (value) {
-					_logger->log_debug("parseProcessorNode: value => [%s]", value);
-					propertyValue = value;
-					xmlFree(value);
-				}
-			}
-			if (!propertyName.empty() && !propertyValue.empty()) {
-				processor->setProperty(propertyName, propertyValue);
-			}
-		} // if (currentNode->type == XML_ELEMENT_NODE)
-	} // for node
+    for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+        if (currentNode->type == XML_ELEMENT_NODE) {
+            if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+                char *name = (char *) xmlNodeGetContent(currentNode);
+                if (name) {
+                    _logger->log_debug("parseProcessorNode: name => [%s]", name);
+                    propertyName = name;
+                    xmlFree(name);
+                }
+            }
+            if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) {
+                char *value = (char *) xmlNodeGetContent(currentNode);
+                if (value) {
+                    _logger->log_debug("parseProcessorNode: value => [%s]", value);
+                    propertyValue = value;
+                    xmlFree(value);
+                }
+            }
+            if (!propertyName.empty() && !propertyValue.empty()) {
+                processor->setProperty(propertyName, propertyValue);
+            }
+        } // if (currentNode->type == XML_ELEMENT_NODE)
+    } // for node
 }
 
 void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) {
-	uuid_t uuid;
-	Processor *processor = NULL;
-	RemoteProcessorGroupPort *port = NULL;
+    uuid_t uuid;
+    Processor *processor = NULL;
+    RemoteProcessorGroupPort *port = NULL;
 
-	if (!parent) {
-		_logger->log_error("parseProcessNode: no parent group existed");
-		return;
-	}
+    if (!parent) {
+        _logger->log_error("parseProcessNode: no parent group existed");
+        return;
+    }
 
-	YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+    YAML::Node inputPortsObj = portNode->as<YAML::Node>();
 
-	// generate the random UIID
-	uuid_generate(uuid);
+    // generate the random UIID
+    uuid_generate(uuid);
 
-	auto portId = inputPortsObj["id"].as<std::string>();
-	auto nameStr = inputPortsObj["name"].as<std::string>();
-	uuid_parse(portId.c_str(), uuid);
+    auto portId = inputPortsObj["id"].as<std::string>();
+    auto nameStr = inputPortsObj["name"].as<std::string>();
+    uuid_parse(portId.c_str(), uuid);
 
-	port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+    port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
 
-	processor = (Processor *) port;
-	port->setDirection(direction);
-	port->setTimeOut(parent->getTimeOut());
-	port->setTransmitting(true);
-	processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-	processor->initialize();
+    processor = (Processor *) port;
+    port->setDirection(direction);
+    port->setTimeOut(parent->getTimeOut());
+    port->setTransmitting(true);
+    processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+    processor->initialize();
 
-	// handle port properties
-	YAML::Node nodeVal = portNode->as<YAML::Node>();
-	YAML::Node propertiesNode = nodeVal["Properties"];
+    // handle port properties
+    YAML::Node nodeVal = portNode->as<YAML::Node>();
+    YAML::Node propertiesNode = nodeVal["Properties"];
 
-	parsePropertiesNodeYaml(&propertiesNode, processor);
+    parsePropertiesNodeYaml(&propertiesNode, processor);
 
-	// add processor to parent
-	parent->addProcessor(processor);
-	processor->setScheduledState(RUNNING);
-	auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
-	int64_t maxConcurrentTasks;
-	if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
-		processor->setMaxConcurrentTasks(maxConcurrentTasks);
-	}
-	_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-	processor->setMaxConcurrentTasks(maxConcurrentTasks);
+    // add processor to parent
+    parent->addProcessor(processor);
+    processor->setScheduledState(RUNNING);
+    auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
+    int64_t maxConcurrentTasks;
+    if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+        processor->setMaxConcurrentTasks(maxConcurrentTasks);
+    }
+    _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+    processor->setMaxConcurrentTasks(maxConcurrentTasks);
 
 }
 
 void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) {
-	char *id = NULL;
-	char *name = NULL;
-	uuid_t uuid;
-	xmlNode *currentNode;
-	Processor *processor = NULL;
-	RemoteProcessorGroupPort *port = NULL;
-
-	if (!parent) {
-		_logger->log_error("parseProcessNode: no parent group existed");
-		return;
-	}
+    char *id = NULL;
+    char *name = NULL;
+    uuid_t uuid;
+    xmlNode *currentNode;
+    Processor *processor = NULL;
+    RemoteProcessorGroupPort *port = NULL;
+
+    if (!parent) {
+        _logger->log_error("parseProcessNode: no parent group existed");
+        return;
+    }
 // generate the random UIID
-	uuid_generate(uuid);
-
-	for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
-		if (currentNode->type == XML_ELEMENT_NODE) {
-			if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-				id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseProcessorNode: id => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
-				name = (char *) xmlNodeGetContent(currentNode);
-				if (name) {
-					_logger->log_debug("parseProcessorNode: name => [%s]", name);
-					port = new RemoteProcessorGroupPort(name, uuid);
-					processor = (Processor *) port;
-					if (processor == NULL) {
-						xmlFree(name);
-						return;
-					}
-					port->setDirection(direction);
-					port->setTimeOut(parent->getTimeOut());
-					port->setTransmitting(parent->getTransmitting());
-					processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-					processor->initialize();
-					// add processor to parent
-					parent->addProcessor(processor);
-					xmlFree(name);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					std::string state = temp;
-					if (state == "DISABLED") {
-						_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
-						processor->setScheduledState(DISABLED);
-					}
-					if (state == "STOPPED") {
-						_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
-						processor->setScheduledState(STOPPED);
-					}
-					if (state == "RUNNING") {
-						_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
-						processor->setScheduledState(RUNNING);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					int64_t maxConcurrentTasks;
-					if (Property::StringToInt(temp, maxConcurrentTasks)) {
-						_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-						processor->setMaxConcurrentTasks(maxConcurrentTasks);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
-				this->parseProcessorProperty(doc, currentNode, processor);
-			}
-		} // if (currentNode->type == XML_ELEMENT_NODE)
-	} // while node
+    uuid_generate(uuid);
+
+    for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+        if (currentNode->type == XML_ELEMENT_NODE) {
+            if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseProcessorNode: id => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+                name = (char *) xmlNodeGetContent(currentNode);
+                if (name) {
+                    _logger->log_debug("parseProcessorNode: name => [%s]", name);
+                    port = new RemoteProcessorGroupPort(name, uuid);
+                    processor = (Processor *) port;
+                    if (processor == NULL) {
+                        xmlFree(name);
+                        return;
+                    }
+                    port->setDirection(direction);
+                    port->setTimeOut(parent->getTimeOut());
+                    port->setTransmitting(parent->getTransmitting());
+                    processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+                    processor->initialize();
+                    // add processor to parent
+                    parent->addProcessor(processor);
+                    xmlFree(name);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    std::string state = temp;
+                    if (state == "DISABLED") {
+                        _logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
+                        processor->setScheduledState(DISABLED);
+                    }
+                    if (state == "STOPPED") {
+                        _logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
+                        processor->setScheduledState(STOPPED);
+                    }
+                    if (state == "RUNNING") {
+                        _logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
+                        processor->setScheduledState(RUNNING);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    int64_t maxConcurrentTasks;
+                    if (Property::StringToInt(temp, maxConcurrentTasks)) {
+                        _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+                        processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
+                this->parseProcessorProperty(doc, currentNode, processor);
+            }
+        } // if (currentNode->type == XML_ELEMENT_NODE)
+    } // while node
 }
 
 void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) {
-	char *id = NULL;
-	char *name = NULL;
-	int64_t schedulingPeriod = -1;
-	int64_t penalizationPeriod = -1;
-	int64_t yieldPeriod = -1;
-	bool lossTolerant = false;
-	int64_t runDurationNanos = -1;
-	uuid_t uuid;
-	xmlNode *currentNode;
-	Processor *processor = NULL;
-
-	if (!parent) {
-		_logger->log_error("parseProcessNode: no parent group existed");
-		return;
-	}
+    char *id = NULL;
+    char *name = NULL;
+    int64_t schedulingPeriod = -1;
+    int64_t penalizationPeriod = -1;
+    int64_t yieldPeriod = -1;
+    bool lossTolerant = false;
+    int64_t runDurationNanos = -1;
+    uuid_t uuid;
+    xmlNode *currentNode;
+    Processor *processor = NULL;
+
+    if (!parent) {
+        _logger->log_error("parseProcessNode: no parent group existed");
+        return;
+    }
 // generate the random UIID
-	uuid_generate(uuid);
-
-	for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
-		if (currentNode->type == XML_ELEMENT_NODE) {
-			if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-				id = (char *) xmlNodeGetContent(currentNode);
-				if (id) {
-					_logger->log_debug("parseProcessorNode: id => [%s]", id);
-					uuid_parse(id, uuid);
-					xmlFree(id);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
-				name = (char *) xmlNodeGetContent(currentNode);
-				if (name) {
-					_logger->log_debug("parseProcessorNode: name => [%s]", name);
-					processor = this->createProcessor(name, uuid);
-					if (processor == NULL) {
-						xmlFree(name);
-						return;
-					}
-					// add processor to parent
-					parent->addProcessor(processor);
-					xmlFree(name);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) {
-				TimeUnit unit;
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToTime(temp, schedulingPeriod, unit)
-							&& Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-						_logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
-						processor->setSchedulingPeriodNano(schedulingPeriod);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) {
-				TimeUnit unit;
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToTime(temp, penalizationPeriod, unit)
-							&& Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
-						_logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
-						processor->setPenalizationPeriodMsec(penalizationPeriod);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
-				TimeUnit unit;
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToTime(temp, yieldPeriod, unit)
-							&& Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
-						_logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
-						processor->setYieldPeriodMsec(yieldPeriod);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToBool(temp, lossTolerant)) {
-						_logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
-						processor->setlossTolerant(lossTolerant);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					std::string state = temp;
-					if (state == "DISABLED") {
-						_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
-						processor->setScheduledState(DISABLED);
-					}
-					if (state == "STOPPED") {
-						_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
-						processor->setScheduledState(STOPPED);
-					}
-					if (state == "RUNNING") {
-						_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
-						processor->setScheduledState(RUNNING);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					std::string strategy = temp;
-					if (strategy == "TIMER_DRIVEN") {
-						_logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", strategy.c_str());
-						processor->setSchedulingStrategy(TIMER_DRIVEN);
-					}
-					if (strategy == "EVENT_DRIVEN") {
-						_logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", strategy.c_str());
-						processor->setSchedulingStrategy(EVENT_DRIVEN);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					int64_t maxConcurrentTasks;
-					if (Property::StringToInt(temp, maxConcurrentTasks)) {
-						_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-						processor->setMaxConcurrentTasks(maxConcurrentTasks);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					if (Property::StringToInt(temp, runDurationNanos)) {
-						_logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
-						processor->setRunDurationNano(runDurationNanos);
-					}
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) {
-				char *temp = (char *) xmlNodeGetContent(currentNode);
-				if (temp) {
-					std::string relationshipName = temp;
-					Relationship relationship(relationshipName, "");
-					std::set<Relationship> relationships;
-
-					relationships.insert(relationship);
-					processor->setAutoTerminatedRelationships(relationships);
-					_logger->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]",
-							relationshipName.c_str());
-					xmlFree(temp);
-				}
-			} else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
-				this->parseProcessorProperty(doc, currentNode, processor);
-			}
-		} // if (currentNode->type == XML_ELEMENT_NODE)
-	} // while node
+    uuid_generate(uuid);
+
+    for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+        if (currentNode->type == XML_ELEMENT_NODE) {
+            if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                id = (char *) xmlNodeGetContent(currentNode);
+                if (id) {
+                    _logger->log_debug("parseProcessorNode: id => [%s]", id);
+                    uuid_parse(id, uuid);
+                    xmlFree(id);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
+                name = (char *) xmlNodeGetContent(currentNode);
+                if (name) {
+                    _logger->log_debug("parseProcessorNode: name => [%s]", name);
+                    processor = this->createProcessor(name, uuid);
+                    if (processor == NULL) {
+                        xmlFree(name);
+                        return;
+                    }
+                    // add processor to parent
+                    parent->addProcessor(processor);
+                    xmlFree(name);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) {
+                TimeUnit unit;
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToTime(temp, schedulingPeriod, unit)
+                        && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+                        _logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
+                        processor->setSchedulingPeriodNano(schedulingPeriod);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) {
+                TimeUnit unit;
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToTime(temp, penalizationPeriod, unit)
+                        && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+                        _logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
+                        processor->setPenalizationPeriodMsec(penalizationPeriod);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
+                TimeUnit unit;
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToTime(temp, yieldPeriod, unit)
+                        && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+                        _logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
+                        processor->setYieldPeriodMsec(yieldPeriod);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToBool(temp, lossTolerant)) {
+                        _logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
+                        processor->setlossTolerant(lossTolerant);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    std::string state = temp;
+                    if (state == "DISABLED") {
+                        _logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
+                        processor->setScheduledState(DISABLED);
+                    }
+                    if (state == "STOPPED") {
+                        _logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
+                        processor->setScheduledState(STOPPED);
+                    }
+                    if (state == "RUNNING") {
+                        _logger->log_debug("parseProcessorNode: scheduledState  => [%s]", state.c_str());
+                        processor->setScheduledState(RUNNING);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    std::string strategy = temp;
+                    if (strategy == "TIMER_DRIVEN") {
+                        _logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", strategy.c_str());
+                        processor->setSchedulingStrategy(TIMER_DRIVEN);
+                    }
+                    if (strategy == "EVENT_DRIVEN") {
+                        _logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", strategy.c_str());
+                        processor->setSchedulingStrategy(EVENT_DRIVEN);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    int64_t maxConcurrentTasks;
+                    if (Property::StringToInt(temp, maxConcurrentTasks)) {
+                        _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+                        processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    if (Property::StringToInt(temp, runDurationNanos)) {
+                        _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
+                        processor->setRunDurationNano(runDurationNanos);
+                    }
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) {
+                char *temp = (char *) xmlNodeGetContent(currentNode);
+                if (temp) {
+                    std::string relationshipName = temp;
+                    Relationship relationship(relationshipName, "");
+                    std::set<Relationship> relationships;
+
+                    relationships.insert(relationship);
+                    processor->setAutoTerminatedRelationships(relationships);
+                    _logger->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]",
+                                       relationshipName.c_str());
+                    xmlFree(temp);
+                }
+            } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
+                this->parseProcessorProperty(doc, currentNode, processor);
+            }
+        } // if (currentNode->type == XML_ELEMENT_NODE)
+    } // while node
 }
 
 void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor)
@@ -1093,102 +1087,102 @@ void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Process
 }
 
 void FlowController::load(ConfigFormat configFormat) {
-	if (_running) {
-		stop(true);
-	}
-	if (!_initialized) {
-		_logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
-
-		if (ConfigFormat::XML == configFormat) {
-			_logger->log_info("Detected an XML configuration file for processing.");
-
-			xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
-			if (doc == NULL) {
-				_logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str());
-				_initialized = true;
-				return;
-			}
-
-			xmlNode *root = xmlDocGetRootElement(doc);
-
-			if (root == NULL) {
-				_logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str());
-				xmlFreeDoc(doc);
-				xmlCleanupParser();
-			}
-
-			if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) {
-				_logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str());
-				xmlFreeDoc(doc);
-				xmlCleanupParser();
-				return;
-			}
-
-			xmlNode *currentNode;
-
-			for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
-				if (currentNode->type == XML_ELEMENT_NODE) {
-					if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) {
-						this->parseRootProcessGroup(doc, currentNode);
-					} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) {
-						char *temp = (char *) xmlNodeGetContent(currentNode);
-						int64_t maxTimerDrivenThreadCount;
-						if (temp) {
-							if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
-								_logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount);
-								this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
-							}
-							xmlFree(temp);
-						}
-					} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) {
-						char *temp = (char *) xmlNodeGetContent(currentNode);
-						int64_t maxEventDrivenThreadCount;
-						if (temp) {
-							if (Property::StringToInt(temp, maxEventDrivenThreadCount)) {
-								_logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount);
-								this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
-							}
-							xmlFree(temp);
-						}
-					}
-				} // type == XML_ELEMENT_NODE
-			} // for
-
-			xmlFreeDoc(doc);
-			xmlCleanupParser();
-			_initialized = true;
-		} else if (ConfigFormat::YAML == configFormat) {
-			YAML::Node flow = YAML::LoadFile(_configurationFileName);
-
-			YAML::Node flowControllerNode = flow["Flow Controller"];
-			YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
-			YAML::Node connectionsNode = flow["Connections"];
-			YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
-
-			// Create the root process group
-			parseRootProcessGroupYaml(flowControllerNode);
-			parseProcessorNodeYaml(processorsNode, this->_root);
-			parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
-			parseConnectionYaml(&connectionsNode, this->_root);
-
-			_initialized = true;
-		}
-	}
+    if (_running) {
+        stop(true);
+    }
+    if (!_initialized) {
+        _logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
+
+        if (ConfigFormat::XML == configFormat) {
+            _logger->log_info("Detected an XML configuration file for processing.");
+
+            xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
+            if (doc == NULL) {
+                _logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str());
+                _initialized = true;
+                return;
+            }
+
+            xmlNode *root = xmlDocGetRootElement(doc);
+
+            if (root == NULL) {
+                _logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str());
+                xmlFreeDoc(doc);
+                xmlCleanupParser();
+            }
+
+            if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) {
+                _logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str());
+                xmlFreeDoc(doc);
+                xmlCleanupParser();
+                return;
+            }
+
+            xmlNode *currentNode;
+
+            for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
+                if (currentNode->type == XML_ELEMENT_NODE) {
+                    if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) {
+                        this->parseRootProcessGroup(doc, currentNode);
+                    } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) {
+                        char *temp = (char *) xmlNodeGetContent(currentNode);
+                        int64_t maxTimerDrivenThreadCount;
+                        if (temp) {
+                            if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
+                                _logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount);
+                                this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
+                            }
+                            xmlFree(temp);
+                        }
+                    } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) {
+                        char *temp = (char *) xmlNodeGetContent(currentNode);
+                        int64_t maxEventDrivenThreadCount;
+                        if (temp) {
+                            if (Property::StringToInt(temp, maxEventDrivenThreadCount)) {
+                                _logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount);
+                                this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
+                            }
+                            xmlFree(temp);
+                        }
+                    }
+                } // type == XML_ELEMENT_NODE
+            } // for
+
+            xmlFreeDoc(doc);
+            xmlCleanupParser();
+            _initialized = true;
+        } else if (ConfigFormat::YAML == configFormat) {
+            YAML::Node flow = YAML::LoadFile(_configurationFileName);
+
+            YAML::Node flowControllerNode = flow["Flow Controller"];
+            YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
+            YAML::Node connectionsNode = flow["Connections"];
+            YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
+
+            // Create the root process group
+            parseRootProcessGroupYaml(flowControllerNode);
+            parseProcessorNodeYaml(processorsNode, this->_root);
+            parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
+            parseConnectionYaml(&connectionsNode, this->_root);
+
+            _initialized = true;
+        }
+    }
 }
 
 bool FlowController::start() {
-	if (!_initialized) {
-		_logger->log_error("Can not start Flow Controller because it has not been initialized");
-		return false;
-	} else {
-		if (!_running) {
-			_logger->log_info("Start Flow Controller");
-			this->_timerScheduler.start();
-			if (this->_root)
-				this->_root->startProcessing(&this->_timerScheduler);
-			_running = true;
-			this->_protocol->start();
-		}
-		return true;
-	}
+    if (!_initialized) {
+        _logger->log_error("Can not start Flow Controller because it has not been initialized");
+        return false;
+    } else {
+        if (!_running) {
+            _logger->log_info("Start Flow Controller");
+            this->_timerScheduler.start();
+            if (this->_root)
+                this->_root->startProcessing(&this->_timerScheduler);
+            _running = true;
+            this->_protocol->start();
+        }
+        return true;
+    }
 }


[2/2] nifi-minifi-cpp git commit: MINIFI-98 Providing logging information and exiting in case the specified configuration YAML cannot be located. MINIFI-99 Providing default config.yml and updating README with minor adjustments to reflect default build c

Posted by al...@apache.org.
MINIFI-98 Providing logging information and exiting in case the specified configuration YAML cannot be located.
MINIFI-99 Providing default config.yml and updating README with minor adjustments to reflect default build
configuration.


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/f14f2006
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/f14f2006
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/f14f2006

Branch: refs/heads/master
Commit: f14f200649077cc61ac4ac5e96562dd6a802eebb
Parents: c6b7ac0
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Nov 28 16:50:43 2016 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Nov 29 00:54:38 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                   |    3 +-
 README.md                        |  140 ++-
 conf/config.yml                  |   20 +
 conf/minifi.properties           |    2 +-
 libminifi/src/FlowController.cpp | 1876 ++++++++++++++++-----------------
 5 files changed, 1026 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 082e994..d99a8d9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -52,7 +52,6 @@ find_package(Threads REQUIRED)
 # Provide custom modules for the project
 list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
 
-#file(GLOB SOURCES "libminifi/src/*.cpp")
 add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
 add_subdirectory(libminifi)
 add_subdirectory(main)
@@ -64,7 +63,7 @@ set(CPACK_SOURCE_PACKAGE_FILE_NAME "${ASSEMBLY_BASE_NAME}-source")
 set(CPACK_SOURCE_IGNORE_FILES "/build/;/.bzr/;~$;${CPACK_SOURCE_IGNORE_FILES}")
 
 # Generate binary assembly
-install(FILES conf/minifi.properties
+install(FILES conf/minifi.properties conf/config.yml
         DESTINATION conf
         COMPONENT bin)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 388fde2..165a572 100644
--- a/README.md
+++ b/README.md
@@ -51,7 +51,8 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
   * GenerateFlowFile
   * LogAttribute
   * ListenSyslog
-* Provenance events generation is supported and they are persistent using levelDB.
+  * ExecuteProcess
+* Provenance events generation is supported and are persisted using levelDB.
 
 ## System Requirements
 
@@ -69,7 +70,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
 * libboost and boost-devel
   * 1.48.0 or greater
 * libxml2 and libxml2-devel
-* leveldb 
+* leveldb and leveldb-devel
 
 ### To run
 
@@ -79,89 +80,86 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a
 ## Getting Started
 
 ### Building
-- From your source checkout, create a directory to perform the build (e.g. build) and cd into that directory.
-
-
-    # ~/Development/code/apache/nifi-minifi-cpp on git:master
-    $ mkdir build
-    # ~/Development/code/apache/nifi-minifi-cpp on git:master
-    $ cd build
 
+- From your source checkout, create a directory to perform the build (e.g. build) and cd into that directory.
+  ```
+  # ~/Development/code/apache/nifi-minifi-cpp on git:master
+  $ mkdir build
+  # ~/Development/code/apache/nifi-minifi-cpp on git:master
+  $ cd build
+  ```
 
 - Perform a `cmake ..` to generate the project files
-
-
-    # ~/Development/code/apache/nifi-minifi-cpp on git:master
-    $ cmake ..
-    ...
-    -- Configuring done
-    -- Generating done
-    -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build
-
+  ```
+  # ~/Development/code/apache/nifi-minifi-cpp on git:master
+  $ cmake ..
+  ...
+  -- Configuring done
+  -- Generating done
+  -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build
+  ```
 
 - Perform a build
-
-
-    # ~/Development/code/apache/nifi-minifi-cpp on git:master
-    $ make
-    Scanning dependencies of target gmock_main
-    Scanning dependencies of target gmock
-    Scanning dependencies of target minifi
-    Scanning dependencies of target gtest
-    Scanning dependencies of target yaml-cpp
-    [  1%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/gtest/CMakeFiles/gtest.dir/src/gtest-all.cc.o
-    [  3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/gtest/src/gtest-all.cc.o
-    [  3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/src/gmock-all.cc.o
-    [  6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/gtest/src/gtest-all.cc.o
-    [  6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/src/gmock-all.cc.o
-    [  7%] Building CXX object libminifi/CMakeFiles/minifi.dir/src/Configure.cpp.o
-
-    ...
-
-    [ 97%] Linking CXX executable minifi
-    [ 97%] Built target minifiexe
-    [ 98%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/CMakeFiles/run-tests.dir/node/node_test.cpp.o
-    [100%] Linking CXX executable run-tests
-    [100%] Built target run-tests
-
-
+  ```
+  # ~/Development/code/apache/nifi-minifi-cpp on git:master
+  $ make
+  Scanning dependencies of target gmock_main
+  Scanning dependencies of target gmock
+  Scanning dependencies of target minifi
+  Scanning dependencies of target gtest
+  Scanning dependencies of target yaml-cpp
+  [  1%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/gtest/CMakeFiles/gtest.dir/src/gtest-all.cc.o
+  [  3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/gtest/src/gtest-all.cc.o
+  [  3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/src/gmock-all.cc.o
+  [  6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/gtest/src/gtest-all.cc.o
+  [  6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/src/gmock-all.cc.o
+  [  7%] Building CXX object libminifi/CMakeFiles/minifi.dir/src/Configure.cpp.o
+
+  ...
+
+  [ 97%] Linking CXX executable minifi
+  [ 97%] Built target minifiexe
+  [ 98%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/CMakeFiles/run-tests.dir/node/node_test.cpp.o
+  [100%] Linking CXX executable run-tests
+  [100%] Built target run-tests
+  ```
 
 - Create a binary assembly located in your build directory with suffix -bin.tar.gz
-
-
-    ~/Development/code/apache/nifi-minifi-cpp/build
-    $ make package
-    Run CPack packaging tool for source...
-    CPack: Create package using TGZ
-    CPack: Install projects
-    CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
-    CPack: Create package
-    CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-bin.tar.gz generated.
-
+  ```
+  ~/Development/code/apache/nifi-minifi-cpp/build
+  $ make package
+  Run CPack packaging tool for source...
+  CPack: Create package using TGZ
+  CPack: Install projects
+  CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
+  CPack: Create package
+  CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-bin.tar.gz generated.
+  ```
 
 - Create a source assembly located in your build directory with suffix -source.tar.gz
-
-
-    ~/Development/code/apache/nifi-minifi-cpp/build
-    $ make package_source
-    Run CPack packaging tool for source...
-    CPack: Create package using TGZ
-    CPack: Install projects
-    CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
-    CPack: Create package
-    CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-source.tar.gz generated.
+  ```
+  ~/Development/code/apache/nifi-minifi-cpp/build
+  $ make package_source
+  Run CPack packaging tool for source...
+  CPack: Create package using TGZ
+  CPack: Install projects
+  CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp
+  CPack: Create package
+  CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-source.tar.gz generated.
+  ```
 
 
 ### Cleaning
 Remove the build directory created above.
-
-    # ~/Development/code/apache/nifi-minifi-cpp on git:master
-    $ rm -rf ./build
+```
+# ~/Development/code/apache/nifi-minifi-cpp on git:master
+$ rm -rf ./build
+```
 
 ### Configuring
-The 'conf' directory in the root contains a template flow.yml document.  
+The 'conf' directory in the root contains a template config.yml document.  
 
-This is compatible with the format used with the Java MiNiFi application.  Currently, a subset of the configuration is supported.  Additional information on the YAML format for the flow.yml can be found in the [MiNiFi System Administrator Guide](https://nifi.apache.org/minifi/system-admin-guide.html).  
+This is compatible with the format used with the Java MiNiFi application.  Currently, a subset of the configuration is supported.  Additional information on the YAML format for the config.yml can be found in the [MiNiFi System Administrator Guide](https://nifi.apache.org/minifi/system-admin-guide.html).  
 
 Additionally, users can utilize the MiNiFi Toolkit Converter to aid in creating a flow configuration from a generated template exported from a NiFi instance.  The MiNiFi Toolkit Converter tool can be downloaded from http://nifi.apache.org/minifi/download.html under the `MiNiFi Toolkit Binaries` section.  Information on its usage is available at https://nifi.apache.org/minifi/minifi-toolkit.html.
 
@@ -206,11 +204,11 @@ Additionally, users can utilize the MiNiFi Toolkit Converter to aid in creating
                     Host Name: localhost
 
 ### Running
-After completing a [build](#building), the application can be run by issuing:
+After completing a [build](#building), the application can be run by issuing the following from :
 
     $ ./bin/minifi.sh start
 
-By default, this will make use of a flow.yml located in the conf directory.  This configuration file location can be altered by adjusting the property `nifi.flow.configuration.file` in minifi.properties located in the conf directory.
+By default, this will make use of a config.yml located in the conf directory.  This configuration file location can be altered by adjusting the property `nifi.flow.configuration.file` in minifi.properties located in the conf directory.
 
 ### Stopping  
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/conf/config.yml
----------------------------------------------------------------------
diff --git a/conf/config.yml b/conf/config.yml
new file mode 100644
index 0000000..b50c609
--- /dev/null
+++ b/conf/config.yml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+  name: MiNiFi Flow
+Processors: []
+Connections: []
+Remote Processing Groups: []

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f14f2006/conf/minifi.properties
----------------------------------------------------------------------
diff --git a/conf/minifi.properties b/conf/minifi.properties
index f9b2d9f..da46653 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -15,7 +15,7 @@
 
 # Core Properties #
 nifi.version=0.1.0
-nifi.flow.configuration.file=./conf/flow.yml
+nifi.flow.configuration.file=./conf/config.yml
 nifi.administrative.yield.duration=30 sec
 # If a component has no work to do (is "bored"), how long should we wait before checking again for work?
 nifi.bored.yield.duration=10 millis