You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/01 14:49:27 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request, #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

lordgamez opened a new pull request, #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414

   - Rewrite tests for ExecuteProcess processor
   - Fix issue where escaping whitespaces in command argument list does not work
   - Refactor ExecuteProcess processor
   
   https://issues.apache.org/jira/browse/MINIFICPP-1927
   
   ----------------------------------
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1015629660


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
+    exit(1);
+  }
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    exit(1);
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+    exit(1);
+  }
+  exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t read_to_buffer = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));

Review Comment:
   Third parameter could be just `sizeof(buffer)`



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
+    exit(1);
+  }
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    exit(1);
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+    exit(1);
+  }
+  exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t read_to_buffer = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - read_to_buffer))) {

Review Comment:
   There is one more level of parentheses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011971031


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -30,42 +30,36 @@
 #include "core/TypedValues.h"
 #include "utils/gsl.h"
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#pragma clang diagnostic ignored "-Wunused-result"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#pragma GCC diagnostic ignored "-Wunused-result"
-#endif
-
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 #ifndef WIN32

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



##########
extensions/standard-processors/processors/ExecuteProcess.h:
##########
@@ -30,6 +29,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #ifndef WIN32

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1007945951


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;

Review Comment:
   it seems that resetting `total_read` would cause the flowfile not to be transferred if the output is a multiple of the buffer size, transferring based on the existence of the flowfile (`if (flow_file) ...`) seems a safer bet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011877660


##########
extensions/standard-processors/tests/resource_apps/EchoParameters.cpp:
##########
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include <chrono>
+#include <thread>
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    std::cerr << "Usage: ./EchoParameters <delay milliseconds> <text to write>" << std::endl;
+    return 1;
+  }
+
+  std::cout << argv[2] << std::endl;

Review Comment:
   Yes the first parameter can be instant, only the following parameters should be delayed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1003070951


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -30,42 +30,36 @@
 #include "core/TypedValues.h"
 #include "utils/gsl.h"
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#pragma clang diagnostic ignored "-Wunused-result"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#pragma GCC diagnostic ignored "-Wunused-result"
-#endif
-
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 #ifndef WIN32

Review Comment:
   this `ifndef` could be moved to the top, to avoid compiling the included headers



##########
extensions/standard-processors/processors/ExecuteProcess.h:
##########
@@ -30,6 +29,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #ifndef WIN32

Review Comment:
   since the whole thing is not compiled for Windows, this `#ifndef` is unnecessary



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }

Review Comment:
   This logic is very complex, and it does not treat backslashes as I would expect.  For example, on an input of `a \" b \" c \\" d \\\" e " f \ \\ \g`, the output is 4 arguments, `a`, ` b " c " d " e `, `f` and `g`.
   
   Why don't we use `std::quoted` instead?  That seems to do what we need: https://godbolt.org/z/PzYPE71Gx



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);

Review Comment:
   I would move the creation of `argv` to inside `executeChildProcess()`



##########
extensions/standard-processors/processors/ExecuteProcess.h:
##########
@@ -89,38 +85,35 @@ class ExecuteProcess : public core::Processor {
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
- public:
-  // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
-  // Initialize, over write by NiFi ExecuteProcess
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) override;
   void initialize() override;
 
  private:
-  // Logger
+  bool changeWorkdir() const;
+  std::vector<std::string> readArgs() const;
+  void executeProcessForkFailed();
+  void executeChildProcess(const std::vector<char*>& argv);
+  void collectChildProcessOutput(core::ProcessSession& session);
+  void readOutputInBatches(core::ProcessSession& session);
+  void readOutput(core::ProcessSession& session);
+  bool writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const;
+
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecuteProcess>::getLogger();
-  // Property
-  std::string _command;
-  std::string _commandArgument;
-  std::string _workingDir;
-  std::chrono::milliseconds _batchDuration  = std::chrono::milliseconds(0);
-  bool _redirectErrorStream;
-  // Full command
-  std::string _fullCommand;
-  // whether the process is running
-  bool _processRunning;
-  int _pipefd[2];
-  pid_t _pid;
+  std::string command_;
+  std::string command_argument_;
+  std::string working_dir_;
+  std::chrono::milliseconds batch_duration_  = std::chrono::milliseconds(0);
+  bool redirect_error_stream_;
+  std::string full_command_;
+  bool process_running_;
+  int pipefd_[2];

Review Comment:
   `pipefd_` is not initialized



##########
extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ExecuteProcess.h"
+#include "SingleProcessorTestController.h"
+#include "utils/file/FileUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+#ifndef WIN32
+
+class ExecuteProcessTestsFixture {
+ public:
+  ExecuteProcessTestsFixture()
+      : execute_process_(std::make_shared<processors::ExecuteProcess>("ExecuteProcess")),
+        controller_(execute_process_) {
+    LogTestController::getInstance().setTrace<processors::ExecuteProcess>();
+  }
+ protected:
+  std::shared_ptr<processors::ExecuteProcess> execute_process_;
+  test::SingleProcessorTestController controller_;
+};
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run a single command", "[ExecuteProcess]") {
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, "echo -n test"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger("data");

Review Comment:
   we don't have to create an input flow file:
   ```suggestion
     auto result = controller_.trigger();
   ```



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;

Review Comment:
   nitpicking: these could be `static constexpr int`



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);

Review Comment:
   I know it was like this before, but do you know why the child process exits with 1 instead of 0?



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+  if (process_running_) {

Review Comment:
   do we still need `process_running_`?  we don't seem to set it to `true` anywhere



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }

Review Comment:
   again this is old code, but this is very bad: if we can't create a flow file, we drop the data and silently continue?
   
   not sure what we should do here, but at least logging something would be an improvement (both here and in `readOutput()` or `writeToFlowFile()`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1015514169


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;

Review Comment:
   The input requirement was change to INPUT_FORBIDDEN on this processor as per NiFi's implementation. Due to this expression language support was removed on those properties, so there is no point having them in the onTrigger call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1014233587


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;

Review Comment:
   Is there a reason for moving these to onSchedule? Using this overload of getProperty means no expression language support for these properties.



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor

Review Comment:
   You can do the same in a single call using `dup2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1016279852


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor

Review Comment:
   Thanks for the clarification, the official `dup2` description was not quite clear on this to me, updated in 4ef7c6d01164c907defe70995b632e085911e83f



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
+    exit(1);
+  }
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    exit(1);
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+    exit(1);
+  }
+  exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t read_to_buffer = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));

Review Comment:
   Good catch, updated in 4ef7c6d01164c907defe70995b632e085911e83f



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
+    exit(1);
+  }
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    exit(1);
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+    exit(1);
+  }
+  exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t read_to_buffer = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - read_to_buffer))) {

Review Comment:
   Good point, updated in 4ef7c6d01164c907defe70995b632e085911e83f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1007788280


##########
extensions/standard-processors/tests/resource_apps/EchoParameters.cpp:
##########
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include <chrono>
+#include <thread>
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    std::cerr << "Usage: ./EchoParameters <delay milliseconds> <text to write>" << std::endl;
+    return 1;
+  }
+
+  std::cout << argv[2] << std::endl;

Review Comment:
   does the delay only apply to not-the-first text-to-write? e.g. `./EchoParameters 500 "hello" "banana"`, would not actually delay `"hello"`, only `"banana"` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1007817964


##########
PROCESSORS.md:
##########
@@ -550,18 +550,18 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Description
 
-Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals.
+Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals. This processor is not available on Windows systems.

Review Comment:
   should this new line be added to the `ExecuteProcess::Description`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011972454


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853 that it should only return 1 in error cases



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011973468


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;

Review Comment:
   Good catch! Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
szaszm commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1015821223


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor

Review Comment:
   ```suggestion
     if (dup2(pipefd_[1], STDOUT) < 0) {  // points pipefd at file descriptor
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1007788280


##########
extensions/standard-processors/tests/resource_apps/EchoParameters.cpp:
##########
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include <chrono>
+#include <thread>
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    std::cerr << "Usage: ./EchoParameters <delay milliseconds> <text to write>" << std::endl;
+    return 1;
+  }
+
+  std::cout << argv[2] << std::endl;

Review Comment:
   does the delay only apply to not-the-first "<text to write>"? e.g. `./EchoParameters 500 "hello" "banana"`, would not actually delay `"hello"`, only `"banana"` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011972702


##########
extensions/standard-processors/processors/ExecuteProcess.h:
##########
@@ -89,38 +85,35 @@ class ExecuteProcess : public core::Processor {
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
- public:
-  // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
-  // Initialize, over write by NiFi ExecuteProcess
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) override;
   void initialize() override;
 
  private:
-  // Logger
+  bool changeWorkdir() const;
+  std::vector<std::string> readArgs() const;
+  void executeProcessForkFailed();
+  void executeChildProcess(const std::vector<char*>& argv);
+  void collectChildProcessOutput(core::ProcessSession& session);
+  void readOutputInBatches(core::ProcessSession& session);
+  void readOutput(core::ProcessSession& session);
+  bool writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const;
+
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecuteProcess>::getLogger();
-  // Property
-  std::string _command;
-  std::string _commandArgument;
-  std::string _workingDir;
-  std::chrono::milliseconds _batchDuration  = std::chrono::milliseconds(0);
-  bool _redirectErrorStream;
-  // Full command
-  std::string _fullCommand;
-  // whether the process is running
-  bool _processRunning;
-  int _pipefd[2];
-  pid_t _pid;
+  std::string command_;
+  std::string command_argument_;
+  std::string working_dir_;
+  std::chrono::milliseconds batch_duration_  = std::chrono::milliseconds(0);
+  bool redirect_error_stream_;
+  std::string full_command_;
+  bool process_running_;
+  int pipefd_[2];

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



##########
extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ExecuteProcess.h"
+#include "SingleProcessorTestController.h"
+#include "utils/file/FileUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+#ifndef WIN32
+
+class ExecuteProcessTestsFixture {
+ public:
+  ExecuteProcessTestsFixture()
+      : execute_process_(std::make_shared<processors::ExecuteProcess>("ExecuteProcess")),
+        controller_(execute_process_) {
+    LogTestController::getInstance().setTrace<processors::ExecuteProcess>();
+  }
+ protected:
+  std::shared_ptr<processors::ExecuteProcess> execute_process_;
+  test::SingleProcessorTestController controller_;
+};
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run a single command", "[ExecuteProcess]") {
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, "echo -n test"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger("data");

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011971604


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }

Review Comment:
   Good idea, didn't know about this, updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1015514868


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  close(STDOUT);
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor

Review Comment:
   What do you mean by this, what calls could be merged with `dup2`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1007825845


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {

Review Comment:
   `Environment.h` now contains a `setCurrentWorkingDirectory` utility should we migrate to that one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1015514169


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;

Review Comment:
   The input requirement was changed to INPUT_FORBIDDEN on this processor as per NiFi's implementation. Due to this expression language support was removed on those properties, so there is no point having them in the onTrigger call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011971961


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+  if (process_running_) {

Review Comment:
   It shouldn't be needed, removed in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011973043


##########
PROCESSORS.md:
##########
@@ -550,18 +550,18 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Description
 
-Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals.
+Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals. This processor is not available on Windows systems.

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {

Review Comment:
   Good point, updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org