You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/09/28 13:16:28 UTC

nifi-minifi-cpp git commit: MINIFI-109: Add ExecuteProcess

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master eb1f268fb -> a20d82c78


MINIFI-109: Add ExecuteProcess

This closes #13.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a20d82c7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a20d82c7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a20d82c7

Branch: refs/heads/master
Commit: a20d82c784944672b93bf578cdeda87567cf985b
Parents: eb1f268
Author: Bin Qiu <be...@gmail.com>
Authored: Mon Sep 26 10:42:54 2016 -0700
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Sep 28 09:16:01 2016 -0400

----------------------------------------------------------------------
 inc/ExecuteProcess.h             | 112 ++++++++++++++++
 inc/FlowController.h             |   1 +
 inc/Site2SiteClientProtocol.h    |   5 +
 inc/Site2SitePeer.h              |   5 +
 src/ExecuteProcess.cpp           | 243 ++++++++++++++++++++++++++++++++++
 src/FlowController.cpp           |  70 +++++-----
 src/RemoteProcessorGroupPort.cpp |   1 +
 src/Site2SitePeer.cpp            |   1 +
 8 files changed, 405 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/inc/ExecuteProcess.h b/inc/ExecuteProcess.h
new file mode 100644
index 0000000..dce287a
--- /dev/null
+++ b/inc/ExecuteProcess.h
@@ -0,0 +1,112 @@
+/**
+ * @file ExecuteProcess.h
+ * ExecuteProcess class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXECUTE_PROCESS_H__
+#define __EXECUTE_PROCESS_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <iostream>
+#include <sys/types.h>
+#include <signal.h>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! ExecuteProcess Class
+class ExecuteProcess : public Processor
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new processor
+	 */
+	ExecuteProcess(std::string name, uuid_t uuid = NULL)
+	: Processor(name, uuid)
+	{
+		_logger = Logger::getLogger();
+		_redirectErrorStream = false;
+		_batchDuration = 0;
+		_workingDir = ".";
+		_processRunning = false;
+		_pid = 0;
+	}
+	//! Destructor
+	virtual ~ExecuteProcess()
+	{
+		if (_processRunning && _pid > 0)
+			kill(_pid, SIGTERM);
+	}
+	//! Processor Name
+	static const std::string ProcessorName;
+	//! Supported Properties
+	static Property Command;
+	static Property CommandArguments;
+	static Property WorkingDir;
+	static Property BatchDuration;
+	static Property RedirectErrorStream;
+	//! Supported Relationships
+	static Relationship Success;
+
+	//! Nest Callback Class for write stream
+	class WriteCallback : public OutputStreamCallback
+	{
+		public:
+		WriteCallback(char *data, uint64_t size)
+		: _data(data), _dataSize(size) {}
+		char *_data;
+		uint64_t _dataSize;
+		void process(std::ofstream *stream) {
+			if (_data && _dataSize > 0)
+				stream->write(_data, _dataSize);
+		}
+	};
+
+public:
+	//! OnTrigger method, implemented by NiFi ExecuteProcess
+	virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+	//! Initialize, over write by NiFi ExecuteProcess
+	virtual void initialize(void);
+
+protected:
+
+private:
+	//! Logger
+	Logger *_logger;
+	//! Property
+	std::string _command;
+	std::string _commandArgument;
+	std::string _workingDir;
+	int64_t _batchDuration;
+	bool _redirectErrorStream;
+	//! Full command
+	std::string _fullCommand;
+	//! whether the process is running
+	bool _processRunning;
+	int _pipefd[2];
+	pid_t _pid;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/FlowController.h
----------------------------------------------------------------------
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 0698c65..0d758df 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -50,6 +50,7 @@
 #include "GetFile.h"
 #include "TailFile.h"
 #include "ListenSyslog.h"
+#include "ExecuteProcess.h"
 
 //! Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/inc/Site2SiteClientProtocol.h b/inc/Site2SiteClientProtocol.h
index 2a517d7..5b72b11 100644
--- a/inc/Site2SiteClientProtocol.h
+++ b/inc/Site2SiteClientProtocol.h
@@ -454,6 +454,11 @@ public:
 			_peer->setTimeOut(time);
 
 	}
+	//! getTimeout
+	uint64_t getTimeOut()
+	{
+		return _timeOut;
+	}
 	//! setPortId
 	void setPortId(uuid_t id)
 	{

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/inc/Site2SitePeer.h b/inc/Site2SitePeer.h
index e6972ad..ff11637 100644
--- a/inc/Site2SitePeer.h
+++ b/inc/Site2SitePeer.h
@@ -206,6 +206,11 @@ public:
 	{
 		_timeOut = time;
 	}
+	//! getTimeOut
+	uint64_t getTimeOut()
+	{
+		return _timeOut;
+	}
 	int write(uint8_t value, CRC32 *crc = NULL)
 	{
 		return sendData(&value, 1, crc);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/src/ExecuteProcess.cpp b/src/ExecuteProcess.cpp
new file mode 100644
index 0000000..7eb4524
--- /dev/null
+++ b/src/ExecuteProcess.cpp
@@ -0,0 +1,243 @@
+/**
+ * @file ExecuteProcess.cpp
+ * ExecuteProcess class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TimeUtil.h"
+#include "ExecuteProcess.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
+Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", "");
+Property ExecuteProcess::CommandArguments("Command Arguments",
+		"The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", "");
+Property ExecuteProcess::WorkingDir("Working Directory",
+		"The directory to use as the current working directory when executing the command", "");
+Property ExecuteProcess::BatchDuration("Batch Duration",
+		"If the process is expected to be long-running and produce textual output, a batch duration can be specified.", "0");
+Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream",
+		"If true will redirect any error stream output of the process to the output stream.", "false");
+Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship.");
+
+void ExecuteProcess::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(Command);
+	properties.insert(CommandArguments);
+	properties.insert(WorkingDir);
+	properties.insert(BatchDuration);
+	properties.insert(RedirectErrorStream);
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(Success);
+	setSupportedRelationships(relationships);
+}
+
+
+void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::string value;
+	if (context->getProperty(Command.getName(), value))
+	{
+		this->_command = value;
+	}
+	if (context->getProperty(CommandArguments.getName(), value))
+	{
+		this->_commandArgument = value;
+	}
+	if (context->getProperty(WorkingDir.getName(), value))
+	{
+		this->_workingDir = value;
+	}
+	if (context->getProperty(BatchDuration.getName(), value))
+	{
+		TimeUnit unit;
+		if (Property::StringToTime(value, _batchDuration, unit) &&
+			Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration))
+		{
+
+		}
+	}
+	if (context->getProperty(RedirectErrorStream.getName(), value))
+	{
+		Property::StringToBool(value, _redirectErrorStream);
+	}
+	this->_fullCommand = _command + " " + _commandArgument;
+	if (_fullCommand.length() == 0)
+	{
+		yield();
+		return;
+	}
+	if (_workingDir.length() > 0 && _workingDir != ".")
+	{
+		// change to working directory
+		if (chdir(_workingDir.c_str()) != 0)
+		{
+			_logger->log_error("Execute Command can not chdir %s", _workingDir.c_str());
+			yield();
+			return;
+		}
+	}
+	_logger->log_info("Execute Command %s", _fullCommand.c_str());
+	// split the command into array
+	char cstr[_fullCommand.length()+1];
+	std::strcpy(cstr, _fullCommand.c_str());
+	char *p = std::strtok (cstr, " ");
+	int argc = 0;
+	char *argv[64];
+	while (p != 0 && argc < 64)
+	{
+		argv[argc] = p;
+		p = std::strtok(NULL, " ");
+		argc++;
+	}
+	argv[argc] = NULL;
+	int status, died;
+	if (!_processRunning)
+	{
+		_processRunning = true;
+		// if the process has not launched yet
+		// create the pipe
+		if (pipe(_pipefd) == -1)
+		{
+			_processRunning = false;
+			yield();
+			return;
+		}
+		switch (_pid = fork())
+		{
+		case -1:
+			_logger->log_error("Execute Process fork failed");
+			_processRunning = false;
+			close(_pipefd[0]);
+			close(_pipefd[1]);
+			yield();
+			break;
+		case 0 : // this is the code the child runs
+			close(1);      // close stdout
+			dup(_pipefd[1]); // points pipefd at file descriptor
+			if (_redirectErrorStream)
+				// redirect stderr
+				dup2(_pipefd[1], 2);
+			close(_pipefd[0]);
+			execvp(argv[0], argv);
+			exit(1);
+			break;
+		default: // this is the code the parent runs
+			// the parent isn't going to write to the pipe
+			close(_pipefd[1]);
+			if (_batchDuration > 0)
+			{
+				while (1)
+				{
+					std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
+					char buffer[4096];
+					int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+					if (numRead <= 0)
+						break;
+					_logger->log_info("Execute Command Respond %d", numRead);
+					ExecuteProcess::WriteCallback callback(buffer, numRead);
+					FlowFileRecord *flowFile = session->create();
+					if (!flowFile)
+						continue;
+					session->write(flowFile, &callback);
+					session->transfer(flowFile, Success);
+					session->commit();
+				}
+			}
+			else
+			{
+				char buffer[4096];
+				char *bufPtr = buffer;
+				int totalRead = 0;
+				FlowFileRecord *flowFile = NULL;
+				while (1)
+				{
+					int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
+					if (numRead <= 0)
+					{
+						if (totalRead > 0)
+						{
+							_logger->log_info("Execute Command Respond %d", totalRead);
+							// child exits and close the pipe
+							ExecuteProcess::WriteCallback callback(buffer, totalRead);
+							if (!flowFile)
+							{
+								flowFile = session->create();
+								if (!flowFile)
+									break;
+								session->write(flowFile, &callback);
+							}
+							else
+							{
+								session->append(flowFile, &callback);
+							}
+							session->transfer(flowFile, Success);
+						}
+						break;
+					}
+					else
+					{
+						if (numRead == (sizeof(buffer) - totalRead))
+						{
+							// we reach the max buffer size
+							_logger->log_info("Execute Command Max Respond %d", sizeof(buffer));
+							ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
+							if (!flowFile)
+							{
+								flowFile = session->create();
+								if (!flowFile)
+									continue;
+								session->write(flowFile, &callback);
+							}
+							else
+							{
+								session->append(flowFile, &callback);
+							}
+							// Rewind
+							totalRead = 0;
+							bufPtr = buffer;
+						}
+						else
+						{
+							totalRead += numRead;
+							bufPtr += numRead;
+						}
+					}
+				}
+			}
+
+			died= wait(&status);
+			if (WIFEXITED(status))
+			{
+				_logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+			}
+			else
+			{
+				_logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
+			}
+
+			close(_pipefd[0]);
+			_processRunning = false;
+			break;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index 2f16f47..8fbe3dc 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -44,6 +44,7 @@ FlowController::FlowController(std::string name)
 	_initialized = false;
 	_root = NULL;
 	_logger = Logger::getLogger();
+	_protocol = new FlowControlProtocol(this);
 
 	// NiFi config properties
 	_configure = Configure::getConfigure();
@@ -79,9 +80,6 @@ FlowController::FlowController(std::string name)
 		_logger->log_error("Could not locate path from provided configuration file name.");
 	}
 
-	char *flowPath = NULL;
-	char flow_full_path[PATH_MAX];
-
 	std::string pathString(path);
 	_configurationFileName = pathString;
 	_logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
@@ -188,6 +186,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
 	{
 		processor = new ListenSyslog(name, uuid);
 	}
+	else if (name == ExecuteProcess::ProcessorName)
+	{
+		processor = new ExecuteProcess(name, uuid);
+	}
 	else
 	{
 		_logger->log_error("No Processor defined for %s", name.c_str());
@@ -555,49 +557,49 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro
 				_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
 
 				std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
-				_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", yieldPeriod.c_str());
+				_logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str());
 
 				YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
+				ProcessGroup* group = NULL;
 
-				if (inputPorts.IsSequence()) {
-					for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
-						_logger->log_debug("Got a current port, iterating...");
-
-						YAML::Node currPort = portIter->as<YAML::Node>();
+				// generate the random UUID
+				uuid_generate(uuid);
 
-						this->parsePortYaml(&currPort, parentGroup, SEND);
+				char uuidStr[37];
+				uuid_unparse(_uuid, uuidStr);
 
-					} // for node
-					char uuidStr[37];
-					uuid_unparse(_uuid, uuidStr);
+				int64_t timeoutValue = -1;
+				int64_t yieldPeriodValue = -1;
 
-					// generate the random UUID
-					uuid_generate(uuid);
+				group = this->createRemoteProcessGroup(name.c_str(), uuid);
+				group->setParent(parentGroup);
+				parentGroup->addProcessGroup(group);
 
-					int64_t timeoutValue = -1;
-					int64_t yieldPeriodValue = -1;
+				TimeUnit unit;
 
-					ProcessGroup* group = this->createRemoteProcessGroup(name.c_str(), uuid);
-					group->setParent(parentGroup);
-					parentGroup->addProcessGroup(group);
+				if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+							&& Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+					_logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
+					group->setYieldPeriodMsec(yieldPeriodValue);
+				}
 
-					TimeUnit unit;
+				if (Property::StringToTime(timeout, timeoutValue, unit)
+					&& Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+					_logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
+					group->setTimeOut(timeoutValue);
+				}
 
-					if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
-							&& Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
-						_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod.c_str());
-						group->setYieldPeriodMsec(yieldPeriodValue);
-					}
+				group->setTransmitting(true);
+				group->setURL(url);
 
-					if (Property::StringToTime(timeout, timeoutValue, unit)
-							&& Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
-						_logger->log_debug("parseRemoteProcessGroup: timeoutValue => [%d] ms", timeout.c_str());
-						group->setTimeOut(yieldPeriodValue);
-					}
+				if (inputPorts.IsSequence()) {
+					for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+						_logger->log_debug("Got a current port, iterating...");
 
-					group->setTransmitting(true);
-					group->setURL(url);
+						YAML::Node currPort = portIter->as<YAML::Node>();
 
+						this->parsePortYaml(&currPort, group, SEND);
+					} // for node
 				}
 			}
 		}
@@ -812,6 +814,7 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T
 
 	processor = (Processor *) port;
 	port->setDirection(direction);
+	port->setTimeOut(parent->getTimeOut());
 	port->setTransmitting(true);
 	processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
 	processor->initialize();
@@ -1180,6 +1183,7 @@ bool FlowController::start() {
 			if (this->_root)
 				this->_root->startProcessing(&this->_timerScheduler);
 			_running = true;
+			this->_protocol->start();
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/src/RemoteProcessorGroupPort.cpp b/src/RemoteProcessorGroupPort.cpp
index 711e846..9d849ae 100644
--- a/src/RemoteProcessorGroupPort.cpp
+++ b/src/RemoteProcessorGroupPort.cpp
@@ -87,6 +87,7 @@ void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession
 	{
 		// bootstrap the client protocol if needeed
 		context->yield();
+		_logger->log_error("Site2Site bootstrap failed yield period %d peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), _protocol->getPeer()->getTimeOut());
 		return;
 	}
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/src/Site2SitePeer.cpp b/src/Site2SitePeer.cpp
index c844aa5..48e19d0 100644
--- a/src/Site2SitePeer.cpp
+++ b/src/Site2SitePeer.cpp
@@ -177,6 +177,7 @@ int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc)
 		//check for errors
 		if (ret == -1)
 		{
+			_logger->log_error("Site2Site Peer socket %d send failed %s", _socket, strerror(errno));
 			Close();
 			// this->yield();
 			return ret;