You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/24 15:45:24 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1414: MINIFICPP-1927 Fix ExecuteProcess command argument issue and refactor

fgerlits commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1003070951


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -30,42 +30,36 @@
 #include "core/TypedValues.h"
 #include "utils/gsl.h"
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#pragma clang diagnostic ignored "-Wunused-result"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#pragma GCC diagnostic ignored "-Wunused-result"
-#endif
-
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 #ifndef WIN32

Review Comment:
   this `ifndef` could be moved to the top, to avoid compiling the included headers



##########
extensions/standard-processors/processors/ExecuteProcess.h:
##########
@@ -30,6 +29,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #ifndef WIN32

Review Comment:
   since the whole thing is not compiled for Windows, this `#ifndef` is unnecessary



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }

Review Comment:
   This logic is very complex, and it does not treat backslashes as I would expect.  For example, on an input of `a \" b \" c \\" d \\\" e " f \ \\ \g`, the output is 4 arguments, `a`, ` b " c " d " e `, `f` and `g`.
   
   Why don't we use `std::quoted` instead?  That seems to do what we need: https://godbolt.org/z/PzYPE71Gx



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);

Review Comment:
   I would move the creation of `argv` to inside `executeChildProcess()`



##########
extensions/standard-processors/processors/ExecuteProcess.h:
##########
@@ -89,38 +85,35 @@ class ExecuteProcess : public core::Processor {
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
- public:
-  // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
-  // Initialize, over write by NiFi ExecuteProcess
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) override;
   void initialize() override;
 
  private:
-  // Logger
+  bool changeWorkdir() const;
+  std::vector<std::string> readArgs() const;
+  void executeProcessForkFailed();
+  void executeChildProcess(const std::vector<char*>& argv);
+  void collectChildProcessOutput(core::ProcessSession& session);
+  void readOutputInBatches(core::ProcessSession& session);
+  void readOutput(core::ProcessSession& session);
+  bool writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const;
+
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecuteProcess>::getLogger();
-  // Property
-  std::string _command;
-  std::string _commandArgument;
-  std::string _workingDir;
-  std::chrono::milliseconds _batchDuration  = std::chrono::milliseconds(0);
-  bool _redirectErrorStream;
-  // Full command
-  std::string _fullCommand;
-  // whether the process is running
-  bool _processRunning;
-  int _pipefd[2];
-  pid_t _pid;
+  std::string command_;
+  std::string command_argument_;
+  std::string working_dir_;
+  std::chrono::milliseconds batch_duration_  = std::chrono::milliseconds(0);
+  bool redirect_error_stream_;
+  std::string full_command_;
+  bool process_running_;
+  int pipefd_[2];

Review Comment:
   `pipefd_` is not initialized



##########
extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp:
##########
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ExecuteProcess.h"
+#include "SingleProcessorTestController.h"
+#include "utils/file/FileUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+#ifndef WIN32
+
+class ExecuteProcessTestsFixture {
+ public:
+  ExecuteProcessTestsFixture()
+      : execute_process_(std::make_shared<processors::ExecuteProcess>("ExecuteProcess")),
+        controller_(execute_process_) {
+    LogTestController::getInstance().setTrace<processors::ExecuteProcess>();
+  }
+ protected:
+  std::shared_ptr<processors::ExecuteProcess> execute_process_;
+  test::SingleProcessorTestController controller_;
+};
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run a single command", "[ExecuteProcess]") {
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, "echo -n test"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger("data");

Review Comment:
   we don't have to create an input flow file:
   ```suggestion
     auto result = controller_.trigger();
   ```



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;

Review Comment:
   nitpicking: these could be `static constexpr int`



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);

Review Comment:
   I know it was like this before, but do you know why the child process exits with 1 instead of 0?



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const {
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+  if (process_running_) {

Review Comment:
   do we still need `process_running_`?  we don't seem to set it to `true` anywhere



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }

Review Comment:
   again this is old code, but this is very bad: if we can't create a flow file, we drop the data and silently continue?
   
   not sure what we should do here, but at least logging something would be an improvement (both here and in `readOutput()` or `writeToFlowFile()`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org