You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/09/28 13:16:28 UTC
nifi-minifi-cpp git commit: MINIFI-109: Add ExecuteProcess
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master eb1f268fb -> a20d82c78
MINIFI-109: Add ExecuteProcess
This closes #13.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a20d82c7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a20d82c7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a20d82c7
Branch: refs/heads/master
Commit: a20d82c784944672b93bf578cdeda87567cf985b
Parents: eb1f268
Author: Bin Qiu <be...@gmail.com>
Authored: Mon Sep 26 10:42:54 2016 -0700
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Sep 28 09:16:01 2016 -0400
----------------------------------------------------------------------
inc/ExecuteProcess.h | 112 ++++++++++++++++
inc/FlowController.h | 1 +
inc/Site2SiteClientProtocol.h | 5 +
inc/Site2SitePeer.h | 5 +
src/ExecuteProcess.cpp | 243 ++++++++++++++++++++++++++++++++++
src/FlowController.cpp | 70 +++++-----
src/RemoteProcessorGroupPort.cpp | 1 +
src/Site2SitePeer.cpp | 1 +
8 files changed, 405 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/inc/ExecuteProcess.h b/inc/ExecuteProcess.h
new file mode 100644
index 0000000..dce287a
--- /dev/null
+++ b/inc/ExecuteProcess.h
@@ -0,0 +1,112 @@
+/**
+ * @file ExecuteProcess.h
+ * ExecuteProcess class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXECUTE_PROCESS_H__
+#define __EXECUTE_PROCESS_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <iostream>
+#include <sys/types.h>
+#include <signal.h>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! ExecuteProcess Class
+class ExecuteProcess : public Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ ExecuteProcess(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _logger = Logger::getLogger();
+ _redirectErrorStream = false;
+ _batchDuration = 0;
+ _workingDir = ".";
+ _processRunning = false;
+ _pid = 0;
+ }
+ //! Destructor
+ virtual ~ExecuteProcess()
+ {
+ if (_processRunning && _pid > 0)
+ kill(_pid, SIGTERM);
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property Command;
+ static Property CommandArguments;
+ static Property WorkingDir;
+ static Property BatchDuration;
+ static Property RedirectErrorStream;
+ //! Supported Relationships
+ static Relationship Success;
+
+ //! Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback
+ {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data), _dataSize(size) {}
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+public:
+ //! OnTrigger method, implemented by NiFi ExecuteProcess
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi ExecuteProcess
+ virtual void initialize(void);
+
+protected:
+
+private:
+ //! Logger
+ Logger *_logger;
+ //! Property
+ std::string _command;
+ std::string _commandArgument;
+ std::string _workingDir;
+ int64_t _batchDuration;
+ bool _redirectErrorStream;
+ //! Full command
+ std::string _fullCommand;
+ //! whether the process is running
+ bool _processRunning;
+ int _pipefd[2];
+ pid_t _pid;
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/FlowController.h
----------------------------------------------------------------------
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 0698c65..0d758df 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -50,6 +50,7 @@
#include "GetFile.h"
#include "TailFile.h"
#include "ListenSyslog.h"
+#include "ExecuteProcess.h"
//! Default NiFi Root Group Name
#define DEFAULT_ROOT_GROUP_NAME ""
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/inc/Site2SiteClientProtocol.h b/inc/Site2SiteClientProtocol.h
index 2a517d7..5b72b11 100644
--- a/inc/Site2SiteClientProtocol.h
+++ b/inc/Site2SiteClientProtocol.h
@@ -454,6 +454,11 @@ public:
_peer->setTimeOut(time);
}
+ //! getTimeout
+ uint64_t getTimeOut()
+ {
+ return _timeOut;
+ }
//! setPortId
void setPortId(uuid_t id)
{
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/inc/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/inc/Site2SitePeer.h b/inc/Site2SitePeer.h
index e6972ad..ff11637 100644
--- a/inc/Site2SitePeer.h
+++ b/inc/Site2SitePeer.h
@@ -206,6 +206,11 @@ public:
{
_timeOut = time;
}
+ //! getTimeOut
+ uint64_t getTimeOut()
+ {
+ return _timeOut;
+ }
int write(uint8_t value, CRC32 *crc = NULL)
{
return sendData(&value, 1, crc);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/src/ExecuteProcess.cpp b/src/ExecuteProcess.cpp
new file mode 100644
index 0000000..7eb4524
--- /dev/null
+++ b/src/ExecuteProcess.cpp
@@ -0,0 +1,243 @@
+/**
+ * @file ExecuteProcess.cpp
+ * ExecuteProcess class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TimeUtil.h"
+#include "ExecuteProcess.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
+Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", "");
+Property ExecuteProcess::CommandArguments("Command Arguments",
+ "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", "");
+Property ExecuteProcess::WorkingDir("Working Directory",
+ "The directory to use as the current working directory when executing the command", "");
+Property ExecuteProcess::BatchDuration("Batch Duration",
+ "If the process is expected to be long-running and produce textual output, a batch duration can be specified.", "0");
+Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream",
+ "If true will redirect any error stream output of the process to the output stream.", "false");
+Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship.");
+
+void ExecuteProcess::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(Command);
+ properties.insert(CommandArguments);
+ properties.insert(WorkingDir);
+ properties.insert(BatchDuration);
+ properties.insert(RedirectErrorStream);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+
+void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::string value;
+ if (context->getProperty(Command.getName(), value))
+ {
+ this->_command = value;
+ }
+ if (context->getProperty(CommandArguments.getName(), value))
+ {
+ this->_commandArgument = value;
+ }
+ if (context->getProperty(WorkingDir.getName(), value))
+ {
+ this->_workingDir = value;
+ }
+ if (context->getProperty(BatchDuration.getName(), value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _batchDuration, unit) &&
+ Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration))
+ {
+
+ }
+ }
+ if (context->getProperty(RedirectErrorStream.getName(), value))
+ {
+ Property::StringToBool(value, _redirectErrorStream);
+ }
+ this->_fullCommand = _command + " " + _commandArgument;
+ if (_fullCommand.length() == 0)
+ {
+ yield();
+ return;
+ }
+ if (_workingDir.length() > 0 && _workingDir != ".")
+ {
+ // change to working directory
+ if (chdir(_workingDir.c_str()) != 0)
+ {
+ _logger->log_error("Execute Command can not chdir %s", _workingDir.c_str());
+ yield();
+ return;
+ }
+ }
+ _logger->log_info("Execute Command %s", _fullCommand.c_str());
+ // split the command into array
+ char cstr[_fullCommand.length()+1];
+ std::strcpy(cstr, _fullCommand.c_str());
+ char *p = std::strtok (cstr, " ");
+ int argc = 0;
+ char *argv[64];
+ while (p != 0 && argc < 64)
+ {
+ argv[argc] = p;
+ p = std::strtok(NULL, " ");
+ argc++;
+ }
+ argv[argc] = NULL;
+ int status, died;
+ if (!_processRunning)
+ {
+ _processRunning = true;
+ // if the process has not launched yet
+ // create the pipe
+ if (pipe(_pipefd) == -1)
+ {
+ _processRunning = false;
+ yield();
+ return;
+ }
+ switch (_pid = fork())
+ {
+ case -1:
+ _logger->log_error("Execute Process fork failed");
+ _processRunning = false;
+ close(_pipefd[0]);
+ close(_pipefd[1]);
+ yield();
+ break;
+ case 0 : // this is the code the child runs
+ close(1); // close stdout
+ dup(_pipefd[1]); // points pipefd at file descriptor
+ if (_redirectErrorStream)
+ // redirect stderr
+ dup2(_pipefd[1], 2);
+ close(_pipefd[0]);
+ execvp(argv[0], argv);
+ exit(1);
+ break;
+ default: // this is the code the parent runs
+ // the parent isn't going to write to the pipe
+ close(_pipefd[1]);
+ if (_batchDuration > 0)
+ {
+ while (1)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
+ char buffer[4096];
+ int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+ if (numRead <= 0)
+ break;
+ _logger->log_info("Execute Command Respond %d", numRead);
+ ExecuteProcess::WriteCallback callback(buffer, numRead);
+ FlowFileRecord *flowFile = session->create();
+ if (!flowFile)
+ continue;
+ session->write(flowFile, &callback);
+ session->transfer(flowFile, Success);
+ session->commit();
+ }
+ }
+ else
+ {
+ char buffer[4096];
+ char *bufPtr = buffer;
+ int totalRead = 0;
+ FlowFileRecord *flowFile = NULL;
+ while (1)
+ {
+ int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
+ if (numRead <= 0)
+ {
+ if (totalRead > 0)
+ {
+ _logger->log_info("Execute Command Respond %d", totalRead);
+ // child exits and close the pipe
+ ExecuteProcess::WriteCallback callback(buffer, totalRead);
+ if (!flowFile)
+ {
+ flowFile = session->create();
+ if (!flowFile)
+ break;
+ session->write(flowFile, &callback);
+ }
+ else
+ {
+ session->append(flowFile, &callback);
+ }
+ session->transfer(flowFile, Success);
+ }
+ break;
+ }
+ else
+ {
+ if (numRead == (sizeof(buffer) - totalRead))
+ {
+ // we reach the max buffer size
+ _logger->log_info("Execute Command Max Respond %d", sizeof(buffer));
+ ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
+ if (!flowFile)
+ {
+ flowFile = session->create();
+ if (!flowFile)
+ continue;
+ session->write(flowFile, &callback);
+ }
+ else
+ {
+ session->append(flowFile, &callback);
+ }
+ // Rewind
+ totalRead = 0;
+ bufPtr = buffer;
+ }
+ else
+ {
+ totalRead += numRead;
+ bufPtr += numRead;
+ }
+ }
+ }
+ }
+
+ died= wait(&status);
+ if (WIFEXITED(status))
+ {
+ _logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+ }
+ else
+ {
+ _logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
+ }
+
+ close(_pipefd[0]);
+ _processRunning = false;
+ break;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index 2f16f47..8fbe3dc 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -44,6 +44,7 @@ FlowController::FlowController(std::string name)
_initialized = false;
_root = NULL;
_logger = Logger::getLogger();
+ _protocol = new FlowControlProtocol(this);
// NiFi config properties
_configure = Configure::getConfigure();
@@ -79,9 +80,6 @@ FlowController::FlowController(std::string name)
_logger->log_error("Could not locate path from provided configuration file name.");
}
- char *flowPath = NULL;
- char flow_full_path[PATH_MAX];
-
std::string pathString(path);
_configurationFileName = pathString;
_logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
@@ -188,6 +186,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
{
processor = new ListenSyslog(name, uuid);
}
+ else if (name == ExecuteProcess::ProcessorName)
+ {
+ processor = new ExecuteProcess(name, uuid);
+ }
else
{
_logger->log_error("No Processor defined for %s", name.c_str());
@@ -555,49 +557,49 @@ void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGro
_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
- _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", yieldPeriod.c_str());
+ _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str());
YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
+ ProcessGroup* group = NULL;
- if (inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
- _logger->log_debug("Got a current port, iterating...");
-
- YAML::Node currPort = portIter->as<YAML::Node>();
+ // generate the random UUID
+ uuid_generate(uuid);
- this->parsePortYaml(&currPort, parentGroup, SEND);
+ char uuidStr[37];
+ uuid_unparse(_uuid, uuidStr);
- } // for node
- char uuidStr[37];
- uuid_unparse(_uuid, uuidStr);
+ int64_t timeoutValue = -1;
+ int64_t yieldPeriodValue = -1;
- // generate the random UUID
- uuid_generate(uuid);
+ group = this->createRemoteProcessGroup(name.c_str(), uuid);
+ group->setParent(parentGroup);
+ parentGroup->addProcessGroup(group);
- int64_t timeoutValue = -1;
- int64_t yieldPeriodValue = -1;
+ TimeUnit unit;
- ProcessGroup* group = this->createRemoteProcessGroup(name.c_str(), uuid);
- group->setParent(parentGroup);
- parentGroup->addProcessGroup(group);
+ if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+ && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+ _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue);
+ group->setYieldPeriodMsec(yieldPeriodValue);
+ }
- TimeUnit unit;
+ if (Property::StringToTime(timeout, timeoutValue, unit)
+ && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+ _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue);
+ group->setTimeOut(timeoutValue);
+ }
- if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
- && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
- _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod.c_str());
- group->setYieldPeriodMsec(yieldPeriodValue);
- }
+ group->setTransmitting(true);
+ group->setURL(url);
- if (Property::StringToTime(timeout, timeoutValue, unit)
- && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
- _logger->log_debug("parseRemoteProcessGroup: timeoutValue => [%d] ms", timeout.c_str());
- group->setTimeOut(yieldPeriodValue);
- }
+ if (inputPorts.IsSequence()) {
+ for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+ _logger->log_debug("Got a current port, iterating...");
- group->setTransmitting(true);
- group->setURL(url);
+ YAML::Node currPort = portIter->as<YAML::Node>();
+ this->parsePortYaml(&currPort, group, SEND);
+ } // for node
}
}
}
@@ -812,6 +814,7 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T
processor = (Processor *) port;
port->setDirection(direction);
+ port->setTimeOut(parent->getTimeOut());
port->setTransmitting(true);
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
processor->initialize();
@@ -1180,6 +1183,7 @@ bool FlowController::start() {
if (this->_root)
this->_root->startProcessing(&this->_timerScheduler);
_running = true;
+ this->_protocol->start();
}
return true;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/src/RemoteProcessorGroupPort.cpp b/src/RemoteProcessorGroupPort.cpp
index 711e846..9d849ae 100644
--- a/src/RemoteProcessorGroupPort.cpp
+++ b/src/RemoteProcessorGroupPort.cpp
@@ -87,6 +87,7 @@ void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession
{
// bootstrap the client protocol if needeed
context->yield();
+ _logger->log_error("Site2Site bootstrap failed yield period %d peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), _protocol->getPeer()->getTimeOut());
return;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a20d82c7/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/src/Site2SitePeer.cpp b/src/Site2SitePeer.cpp
index c844aa5..48e19d0 100644
--- a/src/Site2SitePeer.cpp
+++ b/src/Site2SitePeer.cpp
@@ -177,6 +177,7 @@ int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc)
//check for errors
if (ret == -1)
{
+ _logger->log_error("Site2Site Peer socket %d send failed %s", _socket, strerror(errno));
Close();
// this->yield();
return ret;