You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/01/25 14:22:16 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1224 Add module directory support to script execution

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 436d2bf  MINIFICPP-1224 Add module directory support to script execution
436d2bf is described below

commit 436d2bfe2c819f0265bef4bafe68294262852a4b
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Jan 25 14:59:00 2022 +0100

    MINIFICPP-1224 Add module directory support to script execution
    
    Signed-off-by: Martin Zink <ma...@apache.org>
    
    This closes #1237
---
 PROCESSORS.md                                      |  1 +
 extensions/librdkafka/ConsumeKafka.cpp             |  2 +-
 extensions/script/ExecuteScript.cpp                | 11 ++-
 extensions/script/ExecuteScript.h                  |  3 +-
 extensions/script/ScriptEngine.h                   |  9 +++
 extensions/script/lua/LuaScriptEngine.cpp          | 27 ++++++-
 extensions/script/lua/LuaScriptEngine.h            |  2 +
 .../script/python/ExecutePythonProcessor.cpp       | 18 ++---
 extensions/script/python/ExecutePythonProcessor.h  |  1 -
 extensions/script/python/PythonScriptEngine.cpp    | 22 ++++++
 extensions/script/python/PythonScriptEngine.h      |  2 +
 extensions/script/tests/CMakeLists.txt             |  3 +-
 .../script/tests/ExecutePythonProcessorTests.cpp   | 44 +++++++++--
 .../TestExecuteScriptProcessorWithLuaScript.cpp    | 90 +++++++++++++++++-----
 .../TestExecuteScriptProcessorWithPythonScript.cpp | 86 +++++++++++++++++----
 .../tests/test_lua_scripts/bar_modules/bar.lua     | 23 ++++++
 .../tests/test_lua_scripts/foo_bar_processor.lua   | 27 +++++++
 .../tests/test_lua_scripts/foo_modules/foo.lua     | 23 ++++++
 .../bar_modules/bar.py}                            | 12 +--
 .../foo_bar_processor.py}                          |  5 +-
 .../foo_modules/foo.py}                            | 12 +--
 .../non_transferring_processor.py                  |  0
 ...passthrough_processor_transfering_to_failure.py |  0
 ...passthrough_processor_transfering_to_success.py |  0
 .../stateful_processor.py                          |  0
 libminifi/include/utils/ProcessorConfigUtils.h     |  1 -
 libminifi/src/utils/ProcessorConfigUtils.cpp       |  8 --
 27 files changed, 343 insertions(+), 89 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index d5aba85..d941df6 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -455,6 +455,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 | Name | Default Value | Allowable Values | Description |
 | - | - | - | - |
+|Module Directory|||Comma-separated list of paths to files and/or directories which contain modules required by the script|
 |**Reload on Script Change**|true||If true and Script File property is used, then script file will be reloaded if it has changed, otherwise the first loaded version will be used at all times.|
 |Script Body|||Script to execute|
 |Script File|||Path to script file to execute. Only one of Script File or Script Body may be used|
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index 608fa6f..7a41d0a 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -211,7 +211,7 @@ void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessio
   context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_);
 
   headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(*context, HeadersToAddAsAttributes.getName());
-  max_poll_records_ = gsl::narrow<std::size_t>(utils::getOptionalUintProperty(*context, MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS));
+  max_poll_records_ = gsl::narrow<std::size_t>(context->getProperty<uint64_t>(MaxPollRecords).value_or(DEFAULT_MAX_POLL_RECORDS));
 
   if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported key attribute encoding: " + key_attribute_encoding_);
diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp
index 77029f1..88c92e1 100644
--- a/extensions/script/ExecuteScript.cpp
+++ b/extensions/script/ExecuteScript.cpp
@@ -30,6 +30,7 @@
 #include "ExecuteScript.h"
 #include "core/Resource.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
@@ -90,7 +91,7 @@ void ExecuteScript::onSchedule(core::ProcessContext *context, core::ProcessSessi
 
   context->getProperty(ScriptFile.getName(), script_file_);
   context->getProperty(ScriptBody.getName(), script_body_);
-  context->getProperty(ModuleDirectory.getName(), module_directory_);
+  module_directory_ = context->getProperty(ModuleDirectory);
 
   if (script_file_.empty() && script_body_.empty()) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Either Script Body or Script File must be defined");
@@ -99,6 +100,10 @@ void ExecuteScript::onSchedule(core::ProcessContext *context, core::ProcessSessi
   if (!script_file_.empty() && !script_body_.empty()) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one of Script File or Script Body may be defined!");
   }
+
+  if (!script_file_.empty() && !std::filesystem::is_regular_file(std::filesystem::status(script_file_))) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Script File set is not a regular file or does not exist: " + script_file_);
+  }
 }
 
 void ExecuteScript::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
@@ -123,6 +128,10 @@ void ExecuteScript::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
     throw std::runtime_error("No script engine available");
   }
 
+  if (module_directory_) {
+    engine->setModulePaths(utils::StringUtils::splitAndTrimRemovingEmpty(*module_directory_, ","));
+  }
+
   if (!script_body_.empty()) {
     engine->eval(script_body_);
   } else if (!script_file_.empty()) {
diff --git a/extensions/script/ExecuteScript.h b/extensions/script/ExecuteScript.h
index b121967..deab0ca 100644
--- a/extensions/script/ExecuteScript.h
+++ b/extensions/script/ExecuteScript.h
@@ -23,6 +23,7 @@
 #include <string>
 #include <memory>
 #include <utility>
+#include <optional>
 
 #include "concurrentqueue.h"
 #include "core/Processor.h"
@@ -158,7 +159,7 @@ class ExecuteScript : public core::Processor {
   ScriptEngineOption script_engine_;
   std::string script_file_;
   std::string script_body_;
-  std::string module_directory_;
+  std::optional<std::string> module_directory_;
 
   ScriptEngineFactory engine_factory_;
 #ifdef LUA_SUPPORT
diff --git a/extensions/script/ScriptEngine.h b/extensions/script/ScriptEngine.h
index 24de218..79f9b1a 100644
--- a/extensions/script/ScriptEngine.h
+++ b/extensions/script/ScriptEngine.h
@@ -18,6 +18,8 @@
 #pragma once
 
 #include <string>
+#include <vector>
+#include <utility>
 
 namespace org {
 namespace apache {
@@ -41,7 +43,14 @@ class ScriptEngine {
    */
   virtual void evalFile(const std::string &file_name) = 0;
 
+  void setModulePaths(std::vector<std::string>&& module_paths) {
+    module_paths_ = std::move(module_paths);
+  }
+
   virtual ~ScriptEngine() = default;
+
+ protected:
+  std::vector<std::string> module_paths_;
 };
 
 } /* namespace script */
diff --git a/extensions/script/lua/LuaScriptEngine.cpp b/extensions/script/lua/LuaScriptEngine.cpp
index 4a8dc25..7884eeb 100644
--- a/extensions/script/lua/LuaScriptEngine.cpp
+++ b/extensions/script/lua/LuaScriptEngine.cpp
@@ -17,6 +17,7 @@
 
 #include <memory>
 #include <string>
+#include <filesystem>
 
 #include "../ScriptProcessContext.h"
 
@@ -63,9 +64,25 @@ LuaScriptEngine::LuaScriptEngine()
       "write", &lua::LuaBaseStream::write);
 }
 
+void LuaScriptEngine::executeScriptWithAppendedModulePaths(std::string& script) {
+  for (const auto& module_path : module_paths_) {
+    if (std::filesystem::is_regular_file(std::filesystem::status(module_path))) {
+      script = "package.path = package.path .. \";" + module_path + "\"\n" + script;
+    } else {
+      script = "package.path = package.path .. \";" + module_path + "/?.lua\"\n" + script;
+    }
+  }
+  lua_.script(script, sol::script_throw_on_error);
+}
+
 void LuaScriptEngine::eval(const std::string &script) {
   try {
-    lua_.script(script, sol::script_throw_on_error);
+    if (!module_paths_.empty()) {
+      auto appended_script = script;
+      executeScriptWithAppendedModulePaths(appended_script);
+    } else {
+      lua_.script(script, sol::script_throw_on_error);
+    }
   } catch (std::exception& e) {
     throw minifi::script::ScriptException(e.what());
   }
@@ -73,7 +90,13 @@ void LuaScriptEngine::eval(const std::string &script) {
 
 void LuaScriptEngine::evalFile(const std::string &file_name) {
   try {
-    lua_.script_file(file_name, sol::script_throw_on_error);
+    if (!module_paths_.empty()) {
+      std::ifstream stream(file_name);
+      std::string script((std::istreambuf_iterator<char>(stream)), std::istreambuf_iterator<char>());
+      executeScriptWithAppendedModulePaths(script);
+    } else {
+      lua_.script_file(file_name, sol::script_throw_on_error);
+    }
   } catch (std::exception& e) {
     throw minifi::script::ScriptException(e.what());
   }
diff --git a/extensions/script/lua/LuaScriptEngine.h b/extensions/script/lua/LuaScriptEngine.h
index 0e78f71..8873335 100644
--- a/extensions/script/lua/LuaScriptEngine.h
+++ b/extensions/script/lua/LuaScriptEngine.h
@@ -110,6 +110,8 @@ class LuaScriptEngine : public script::ScriptEngine {
   }
 
  private:
+  void executeScriptWithAppendedModulePaths(std::string& script);
+
   sol::state lua_;
 };
 
diff --git a/extensions/script/python/ExecutePythonProcessor.cpp b/extensions/script/python/ExecutePythonProcessor.cpp
index ef09a4e..6fbb66b 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -38,17 +38,14 @@ namespace processors {
 
 const core::Property ExecutePythonProcessor::ScriptFile(core::PropertyBuilder::createProperty("Script File")
     ->withDescription("Path to script file to execute. Only one of Script File or Script Body may be used")
-    ->withDefaultValue("")
     ->build());
 
 const core::Property ExecutePythonProcessor::ScriptBody(core::PropertyBuilder::createProperty("Script Body")
     ->withDescription("Script to execute. Only one of Script File or Script Body may be used")
-    ->withDefaultValue("")
     ->build());
 
 const core::Property ExecutePythonProcessor::ModuleDirectory(core::PropertyBuilder::createProperty("Module Directory")
   ->withDescription("Comma-separated list of paths to files and/or directories which contain modules required by the script")
-  ->withDefaultValue("")
   ->build());
 
 const core::Property ExecutePythonProcessor::ReloadOnScriptChange(core::PropertyBuilder::createProperty("Reload on Script Change")
@@ -79,9 +76,6 @@ void ExecutePythonProcessor::initialize() {
     return;
   }
 
-  getProperty(ModuleDirectory.getName(), module_directory_);
-  appendPathForImportModules();
-
   try {
     loadScript();
   } catch(const std::runtime_error&) {
@@ -96,6 +90,7 @@ void ExecutePythonProcessor::initialize() {
 }
 
 void ExecutePythonProcessor::initalizeThroughScriptEngine() {
+  appendPathForImportModules();
   python_script_engine_->eval(script_to_exec_);
   auto shared_this = shared_from_this();
   python_script_engine_->describe(shared_this);
@@ -132,11 +127,10 @@ void ExecutePythonProcessor::onTrigger(const std::shared_ptr<core::ProcessContex
 }
 
 void ExecutePythonProcessor::appendPathForImportModules() {
-  // TODO(hunyadi): I have spent some time trying to figure out pybind11, but
-  // could not get this working yet. It is up to be implemented later
-  // https://issues.apache.org/jira/browse/MINIFICPP-1224
-  if (module_directory_.size()) {
-    logger_->log_error("Not supported property: Module Directory.");
+  std::string module_directory;
+  getProperty(ModuleDirectory.getName(), module_directory);
+  if (module_directory.size()) {
+    python_script_engine_->setModulePaths(utils::StringUtils::splitAndTrimRemovingEmpty(module_directory, ","));
   }
 }
 
@@ -200,7 +194,7 @@ REGISTER_RESOURCE(
     "as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back.Scripts must define an onTrigger function which accepts NiFi Context"
     " and Property objects. For efficiency, scripts are executed once when the processor is run, then the onTrigger method is called for each incoming flowfile. This enables scripts to keep state "
     "if they wish, although there will be a script context per concurrent task of the processor. In order to, e.g., compute an arithmetic sum based on incoming flow file information, set the "
-    "concurrent tasks to 1.");  // NOLINT
+    "concurrent tasks to 1.");
 
 } /* namespace processors */
 } /* namespace python */
diff --git a/extensions/script/python/ExecutePythonProcessor.h b/extensions/script/python/ExecutePythonProcessor.h
index a3904ab..7e7afa1 100644
--- a/extensions/script/python/ExecutePythonProcessor.h
+++ b/extensions/script/python/ExecutePythonProcessor.h
@@ -107,7 +107,6 @@ class ExecutePythonProcessor : public core::Processor {
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getLogger();
 
   std::string script_to_exec_;
-  std::string module_directory_;
   bool reload_on_script_change_;
   uint64_t last_script_write_time_;
   std::string script_file_path_;
diff --git a/extensions/script/python/PythonScriptEngine.cpp b/extensions/script/python/PythonScriptEngine.cpp
index 0e2a380..66e78be 100644
--- a/extensions/script/python/PythonScriptEngine.cpp
+++ b/extensions/script/python/PythonScriptEngine.cpp
@@ -17,8 +17,10 @@
 
 #include <memory>
 #include <string>
+#include <filesystem>
 
 #include "PythonScriptEngine.h"
+#include "utils/file/FileUtils.h"
 
 namespace org {
 namespace apache {
@@ -39,9 +41,28 @@ PythonScriptEngine::PythonScriptEngine() {
   (*bindings_) = py::globals().attr("copy")();
 }
 
+void PythonScriptEngine::evaluateModuleImports() {
+  if (module_paths_.empty()) {
+    return;
+  }
+
+  py::eval<py::eval_statements>("import sys", *bindings_, *bindings_);
+  for (const auto& module_path : module_paths_) {
+    if (std::filesystem::is_regular_file(std::filesystem::status(module_path))) {
+      std::string path;
+      std::string filename;
+      utils::file::getFileNameAndPath(module_path, path, filename);
+      py::eval<py::eval_statements>("sys.path.append('" + path + "')", *bindings_, *bindings_);
+    } else {
+      py::eval<py::eval_statements>("sys.path.append('" + module_path + "')", *bindings_, *bindings_);
+    }
+  }
+}
+
 void PythonScriptEngine::eval(const std::string &script) {
   py::gil_scoped_acquire gil { };
   try {
+    evaluateModuleImports();
     if (script[0] == '\n') {
       py::eval<py::eval_statements>(py::module::import("textwrap").attr("dedent")(script), *bindings_, *bindings_);
     } else {
@@ -55,6 +76,7 @@ void PythonScriptEngine::eval(const std::string &script) {
 void PythonScriptEngine::evalFile(const std::string &file_name) {
   py::gil_scoped_acquire gil { };
   try {
+    evaluateModuleImports();
     py::eval_file(file_name, *bindings_, *bindings_);
   } catch (const std::exception &e) {
     throw minifi::script::ScriptException(e.what());
diff --git a/extensions/script/python/PythonScriptEngine.h b/extensions/script/python/PythonScriptEngine.h
index 4337f52..cf3cbf8 100644
--- a/extensions/script/python/PythonScriptEngine.h
+++ b/extensions/script/python/PythonScriptEngine.h
@@ -228,6 +228,8 @@ class __attribute__((visibility("default"))) PythonScriptEngine : public script:
   }
 
  private:
+  void evaluateModuleImports();
+
   std::unique_ptr<py::dict> bindings_;
 };
 
diff --git a/extensions/script/tests/CMakeLists.txt b/extensions/script/tests/CMakeLists.txt
index cf4bdc8..43d86e2 100644
--- a/extensions/script/tests/CMakeLists.txt
+++ b/extensions/script/tests/CMakeLists.txt
@@ -25,7 +25,7 @@ if (NOT DISABLE_PYTHON_SCRIPTING)
 	if (NOT PYTHONLIBS_FOUND)
 		find_package(PythonLibs 3.0 REQUIRED)
 	endif()
-	file(COPY "${CMAKE_SOURCE_DIR}/extensions/script/tests/test_scripts" DESTINATION "${CMAKE_CURRENT_BINARY_DIR}/")
+	file(COPY "${CMAKE_SOURCE_DIR}/extensions/script/tests/test_python_scripts" DESTINATION "${CMAKE_BINARY_DIR}/bin/")
 endif()
 
 if (ENABLE_LUA_SCRIPTING)
@@ -90,6 +90,7 @@ FOREACH(testfile ${EXECUTESCRIPT_LUA_TESTS})
 	createTests("${testfilename}")
 	MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1")
 	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+	file(COPY "${CMAKE_SOURCE_DIR}/extensions/script/tests/test_lua_scripts" DESTINATION "${CMAKE_BINARY_DIR}/bin/")
 ENDFOREACH()
 
 message("-- Finished building ${EXTENSIONS_TEST_COUNT} ExecuteScript related test file(s)...")
diff --git a/extensions/script/tests/ExecutePythonProcessorTests.cpp b/extensions/script/tests/ExecutePythonProcessorTests.cpp
index 83d7991..4e4cf83 100644
--- a/extensions/script/tests/ExecutePythonProcessorTests.cpp
+++ b/extensions/script/tests/ExecutePythonProcessorTests.cpp
@@ -34,6 +34,10 @@ namespace {
 using org::apache::nifi::minifi::utils::putFileToDir;
 using org::apache::nifi::minifi::utils::getFileContent;
 using org::apache::nifi::minifi::utils::file::getFileNameAndPath;
+using org::apache::nifi::minifi::utils::file::concat_path;
+using org::apache::nifi::minifi::utils::file::get_separator;
+using org::apache::nifi::minifi::utils::file::resolve;
+using org::apache::nifi::minifi::utils::file::getFullPath;
 
 class ExecutePythonProcessorTestBase {
  public:
@@ -41,9 +45,9 @@ class ExecutePythonProcessorTestBase {
       logTestController_(LogTestController::getInstance()),
       logger_(logging::LoggerFactory<ExecutePythonProcessorTestBase>::getLogger()) {
     std::string path;
-    std::string filename;;
+    std::string filename;
     getFileNameAndPath(__FILE__, path, filename);
-    SCRIPT_FILES_DIRECTORY = org::apache::nifi::minifi::utils::file::getFullPath(org::apache::nifi::minifi::utils::file::FileUtils::concat_path(path, "test_scripts"));
+    SCRIPT_FILES_DIRECTORY = getFullPath(concat_path(path, "test_python_scripts"));
     reInitialize();
   }
   virtual ~ExecutePythonProcessorTestBase() {
@@ -60,7 +64,7 @@ class ExecutePythonProcessorTestBase {
   }
 
   std::string getScriptFullPath(const std::string& script_file_name) {
-    return org::apache::nifi::minifi::utils::file::FileUtils::resolve(SCRIPT_FILES_DIRECTORY, script_file_name);
+    return resolve(SCRIPT_FILES_DIRECTORY, script_file_name);
   }
 
   static const std::string TEST_FILE_NAME;
@@ -126,7 +130,7 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
     for (std::size_t i = 0; i < 10; ++i) {
       plan_->runCurrentProcessor();  // PutFile
       const std::string state_name = std::to_string(i);
-      const std::string output_file_path = org::apache::nifi::minifi::utils::file::FileUtils::concat_path(output_dir, state_name);
+      const std::string output_file_path = concat_path(output_dir, state_name);
       const std::string output_file_content{ getFileContent(output_file_path) };
       REQUIRE(output_file_content == state_name);
     }
@@ -164,7 +168,7 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
     const std::string reloaded_script_dir = testController_->createTempDirectory();
     putFileToDir(reloaded_script_dir, "reloaded_script.py", script_content);
 
-    auto execute_python_processor = addExecutePythonProcessorToPlan(org::apache::nifi::minifi::utils::file::FileUtils::concat_path(reloaded_script_dir, "reloaded_script.py"), "");
+    auto execute_python_processor = addExecutePythonProcessorToPlan(concat_path(reloaded_script_dir, "reloaded_script.py"), "");
     if (reload_on_script_change) {
       plan_->setProperty(execute_python_processor, "Reload on Script Change", *reload_on_script_change ? "true" : "false");
     }
@@ -279,4 +283,34 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Test the Reload On Script Ch
   }
 }
 
+TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Test module load of processor", "[executePythonProcessorModuleLoad]") {
+  const std::string input_dir = testController_->createTempDirectory();
+  putFileToDir(input_dir, TEST_FILE_NAME, TEST_FILE_CONTENT);
+  addGetFileProcessorToPlan(input_dir);
+
+  auto execute_python_processor = addExecutePythonProcessorToPlan("foo_bar_processor.py", "");
+  plan_->setProperty(execute_python_processor, "Module Directory", getScriptFullPath("foo_modules/foo.py") + "," + getScriptFullPath("bar_modules"));
+
+  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(execute_python_processor, {"success", "d"}, success_putfile);
+  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  auto success_output_dir = testController_->createTempDirectory();
+  plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir);
+
+  testController_->runSession(plan_);
+  plan_->reset();
+
+  std::vector<std::string> file_contents;
+
+  auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
+    std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+    file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(success_output_dir, lambda, plan_->getLogger(), false);
+
+  REQUIRE(file_contents.size() == 1);
+}
+
 }  // namespace
diff --git a/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp b/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
index 4a3019a..018cf5b 100644
--- a/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
+++ b/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
@@ -26,6 +26,8 @@
 #include "processors/LogAttribute.h"
 #include "processors/GetFile.h"
 #include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
 
 TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
   TestController testController;
@@ -93,13 +95,7 @@ TEST_CASE("Lua: Test Log", "[executescriptLuaLog]") {
   auto getFileDir = testController.createTempDirectory();
   plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
 
-  std::fstream file;
-  std::stringstream ss;
-  ss << getFileDir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-  plan->reset();
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
 
   testController.runSession(plan, false);
   testController.runSession(plan, false);
@@ -322,13 +318,7 @@ TEST_CASE("Lua: Test Update Attribute", "[executescriptLuaUpdateAttribute]") {
   auto getFileDir = testController.createTempDirectory();
   plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
 
-  std::fstream file;
-  std::stringstream ss;
-  ss << getFileDir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-  plan->reset();
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
 
   REQUIRE_NOTHROW(testController.runSession(plan, false));
   REQUIRE_NOTHROW(testController.runSession(plan, false));
@@ -363,8 +353,6 @@ TEST_CASE("Lua: Test Create", "[executescriptLuaCreate]") {
     end
   )");
 
-  plan->reset();
-
   testController.runSession(plan, false);
 
   REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
@@ -400,11 +388,77 @@ TEST_CASE("Lua: Test Require", "[executescriptLuaRequire]") {
     end
   )");
 
-  plan->reset();
-
   REQUIRE_NOTHROW(testController.runSession(plan, false));
 
   REQUIRE(LogTestController::getInstance().contains("[info] OK"));
 
   logTestController.reset();
 }
+
+TEST_CASE("Lua: Test Module Directory property", "[executescriptLuaModuleDirectoryProperty]") {
+  using org::apache::nifi::minifi::utils::file::concat_path;
+  using org::apache::nifi::minifi::utils::file::get_executable_dir;
+
+  TestController testController;
+
+  LogTestController &logTestController = LogTestController::getInstance();
+  logTestController.setDebug<TestPlan>();
+  logTestController.setDebug<minifi::processors::ExecuteScript>();
+
+  const std::string SCRIPT_FILES_DIRECTORY = concat_path(get_executable_dir(), "test_lua_scripts");
+
+  auto getScriptFullPath = [&SCRIPT_FILES_DIRECTORY](const std::string& script_file_name) {
+    return concat_path(SCRIPT_FILES_DIRECTORY, script_file_name);
+  };
+
+  auto plan = testController.createPlan();
+
+  auto getFile = plan->addProcessor("GetFile", "getFile");
+  auto executeScript = plan->addProcessor("ExecuteScript",
+                                          "executeScript",
+                                          core::Relationship("success", "description"),
+                                          true);
+
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ScriptEngine.getName(), "lua");
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ScriptFile.getName(), getScriptFullPath("foo_bar_processor.lua"));
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ModuleDirectory.getName(), getScriptFullPath("foo_modules/foo.lua") + "," + getScriptFullPath("bar_modules"));
+
+  auto getFileDir = testController.createTempDirectory();
+  plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
+
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("foobar"));
+
+  logTestController.reset();
+}
+
+TEST_CASE("Lua: Non existent script file should throw", "[executescriptLuaNonExistentScriptFile]") {
+  TestController testController;
+
+  LogTestController &logTestController = LogTestController::getInstance();
+  logTestController.setDebug<TestPlan>();
+  logTestController.setDebug<minifi::processors::ExecuteScript>();
+
+  auto plan = testController.createPlan();
+
+  auto getFile = plan->addProcessor("GetFile", "getFile");
+  auto executeScript = plan->addProcessor("ExecuteScript",
+                                          "executeScript",
+                                          core::Relationship("success", "description"),
+                                          true);
+
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ScriptEngine.getName(), "lua");
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ScriptFile.getName(), "/tmp/non-existent-file");
+
+  auto getFileDir = testController.createTempDirectory();
+  plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
+
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
+
+  REQUIRE_THROWS_AS(testController.runSession(plan), minifi::Exception);
+
+  logTestController.reset();
+}
diff --git a/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp b/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
index 100d0c1..1ae7123 100644
--- a/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
+++ b/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
@@ -26,6 +26,8 @@
 #include "processors/LogAttribute.h"
 #include "processors/GetFile.h"
 #include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
 
 TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
   TestController testController;
@@ -252,8 +254,6 @@ TEST_CASE("Python: Test Create", "[executescriptPythonCreate]") {
         session.transfer(flow_file, REL_SUCCESS)
   )");
 
-  plan->reset();
-
   testController.runSession(plan, false);
 
   REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
@@ -296,13 +296,7 @@ TEST_CASE("Python: Test Update Attribute", "[executescriptPythonUpdateAttribute]
   auto getFileDir = testController.createTempDirectory();
   plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
 
-  std::fstream file;
-  std::stringstream ss;
-  ss << getFileDir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-  plan->reset();
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
 
   testController.runSession(plan, false);
   testController.runSession(plan, false);
@@ -341,13 +335,7 @@ TEST_CASE("Python: Test Get Context Property", "[executescriptPythonGetContextPr
   auto getFileDir = testController.createTempDirectory();
   plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
 
-  std::fstream file;
-  std::stringstream ss;
-  ss << getFileDir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-  plan->reset();
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
 
   testController.runSession(plan, false);
   testController.runSession(plan, false);
@@ -357,3 +345,69 @@ TEST_CASE("Python: Test Get Context Property", "[executescriptPythonGetContextPr
 
   logTestController.reset();
 }
+
+TEST_CASE("Python: Test Module Directory property", "[executescriptPythonModuleDirectoryProperty]") {
+  using org::apache::nifi::minifi::utils::file::concat_path;
+  using org::apache::nifi::minifi::utils::file::get_executable_dir;
+
+  TestController testController;
+
+  LogTestController &logTestController = LogTestController::getInstance();
+  logTestController.setDebug<TestPlan>();
+  logTestController.setDebug<minifi::processors::ExecuteScript>();
+
+  const std::string SCRIPT_FILES_DIRECTORY = concat_path(get_executable_dir(), "test_python_scripts");
+
+  auto getScriptFullPath = [&SCRIPT_FILES_DIRECTORY](const std::string& script_file_name) {
+    return concat_path(SCRIPT_FILES_DIRECTORY, script_file_name);
+  };
+
+  auto plan = testController.createPlan();
+
+  auto getFile = plan->addProcessor("GetFile", "getFile");
+  auto executeScript = plan->addProcessor("ExecuteScript",
+                                          "executeScript",
+                                          core::Relationship("success", "description"),
+                                          true);
+
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ScriptFile.getName(), getScriptFullPath("foo_bar_processor.py"));
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ModuleDirectory.getName(), getScriptFullPath("foo_modules/foo.py") + "," + getScriptFullPath("bar_modules"));
+
+  auto getFileDir = testController.createTempDirectory();
+  plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
+
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("foobar"));
+
+  logTestController.reset();
+}
+
+TEST_CASE("Python: Non existent script file should throw", "[executescriptPythonNonExistentScriptFile]") {
+  TestController testController;
+
+  LogTestController &logTestController = LogTestController::getInstance();
+  logTestController.setDebug<TestPlan>();
+  logTestController.setDebug<minifi::processors::ExecuteScript>();
+
+  auto plan = testController.createPlan();
+
+  auto getFile = plan->addProcessor("GetFile", "getFile");
+  auto executeScript = plan->addProcessor("ExecuteScript",
+                                          "executeScript",
+                                          core::Relationship("success", "description"),
+                                          true);
+
+  plan->setProperty(executeScript, minifi::processors::ExecuteScript::ScriptFile.getName(), "/tmp/non-existent-file");
+
+  auto getFileDir = testController.createTempDirectory();
+  plan->setProperty(getFile, minifi::processors::GetFile::Directory.getName(), getFileDir);
+
+  utils::putFileToDir(getFileDir, "tstFile.ext", "tempFile");
+
+  REQUIRE_THROWS_AS(testController.runSession(plan), minifi::Exception);
+
+  logTestController.reset();
+}
diff --git a/extensions/script/tests/test_lua_scripts/bar_modules/bar.lua b/extensions/script/tests/test_lua_scripts/bar_modules/bar.lua
new file mode 100644
index 0000000..768401c
--- /dev/null
+++ b/extensions/script/tests/test_lua_scripts/bar_modules/bar.lua
@@ -0,0 +1,23 @@
+---
+---  Licensed to the Apache Software Foundation (ASF) under one or more
+---  contributor license agreements.  See the NOTICE file distributed with
+---  this work for additional information regarding copyright ownership.
+---  The ASF licenses this file to You under the Apache License, Version 2.0
+---  (the "License"); you may not use this file except in compliance with
+---  the License.  You may obtain a copy of the License at
+---
+---      http://www.apache.org/licenses/LICENSE-2.0
+---
+---  Unless required by applicable law or agreed to in writing, software
+---  distributed under the License is distributed on an "AS IS" BASIS,
+---  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+---  See the License for the specific language governing permissions and
+---  limitations under the License.
+---
+local bar_module = {}
+
+function bar_module.bar()
+    return "bar"
+end
+
+return bar_module
diff --git a/extensions/script/tests/test_lua_scripts/foo_bar_processor.lua b/extensions/script/tests/test_lua_scripts/foo_bar_processor.lua
new file mode 100644
index 0000000..55af1bb
--- /dev/null
+++ b/extensions/script/tests/test_lua_scripts/foo_bar_processor.lua
@@ -0,0 +1,27 @@
+---
+---  Licensed to the Apache Software Foundation (ASF) under one or more
+---  contributor license agreements.  See the NOTICE file distributed with
+---  this work for additional information regarding copyright ownership.
+---  The ASF licenses this file to You under the Apache License, Version 2.0
+---  (the "License"); you may not use this file except in compliance with
+---  the License.  You may obtain a copy of the License at
+---
+---      http://www.apache.org/licenses/LICENSE-2.0
+---
+---  Unless required by applicable law or agreed to in writing, software
+---  distributed under the License is distributed on an "AS IS" BASIS,
+---  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+---  See the License for the specific language governing permissions and
+---  limitations under the License.
+---
+local foo_module = require "foo"
+local bar_module = require "bar"
+
+
+function onTrigger(context, session)
+    flow_file = session:get()
+    log:info(foo_module.foo() .. bar_module.bar())
+    if flow_file ~= nil then
+        session:transfer(flow_file, REL_SUCCESS)
+    end
+end
diff --git a/extensions/script/tests/test_lua_scripts/foo_modules/foo.lua b/extensions/script/tests/test_lua_scripts/foo_modules/foo.lua
new file mode 100644
index 0000000..8cc4db4
--- /dev/null
+++ b/extensions/script/tests/test_lua_scripts/foo_modules/foo.lua
@@ -0,0 +1,23 @@
+---
+---  Licensed to the Apache Software Foundation (ASF) under one or more
+---  contributor license agreements.  See the NOTICE file distributed with
+---  this work for additional information regarding copyright ownership.
+---  The ASF licenses this file to You under the Apache License, Version 2.0
+---  (the "License"); you may not use this file except in compliance with
+---  the License.  You may obtain a copy of the License at
+---
+---      http://www.apache.org/licenses/LICENSE-2.0
+---
+---  Unless required by applicable law or agreed to in writing, software
+---  distributed under the License is distributed on an "AS IS" BASIS,
+---  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+---  See the License for the specific language governing permissions and
+---  limitations under the License.
+---
+local foo_module = {}
+
+function foo_module.foo()
+    return "foo"
+end
+
+return foo_module
diff --git a/extensions/script/tests/test_scripts/non_transferring_processor.py b/extensions/script/tests/test_python_scripts/bar_modules/bar.py
similarity index 69%
copy from extensions/script/tests/test_scripts/non_transferring_processor.py
copy to extensions/script/tests/test_python_scripts/bar_modules/bar.py
index afb83d4..6bcbae9 100644
--- a/extensions/script/tests/test_scripts/non_transferring_processor.py
+++ b/extensions/script/tests/test_python_scripts/bar_modules/bar.py
@@ -17,13 +17,5 @@
 #
 
 
-def describe(processor):
-    processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
-
-
-def onTrigger(context, session):
-    flow_file = session.get()
-    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
-
-    if flow_file is not None:
-        log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+def bar():
+    return "bar"
diff --git a/extensions/script/tests/test_scripts/passthrough_processor_transfering_to_success.py b/extensions/script/tests/test_python_scripts/foo_bar_processor.py
similarity index 88%
copy from extensions/script/tests/test_scripts/passthrough_processor_transfering_to_success.py
copy to extensions/script/tests/test_python_scripts/foo_bar_processor.py
index 5ef7f5f..199a5ce 100644
--- a/extensions/script/tests/test_scripts/passthrough_processor_transfering_to_success.py
+++ b/extensions/script/tests/test_python_scripts/foo_bar_processor.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
+import foo
+import bar
 
 
 def describe(processor):
@@ -23,8 +25,7 @@ def describe(processor):
 
 def onTrigger(context, session):
     flow_file = session.get()
-    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
+    log.info(foo.foo() + bar.bar())
 
     if flow_file is not None:
-        log.info('created flow file: %s' % flow_file.getAttribute('filename'))
         session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/script/tests/test_scripts/non_transferring_processor.py b/extensions/script/tests/test_python_scripts/foo_modules/foo.py
similarity index 69%
copy from extensions/script/tests/test_scripts/non_transferring_processor.py
copy to extensions/script/tests/test_python_scripts/foo_modules/foo.py
index afb83d4..8a199de 100644
--- a/extensions/script/tests/test_scripts/non_transferring_processor.py
+++ b/extensions/script/tests/test_python_scripts/foo_modules/foo.py
@@ -17,13 +17,5 @@
 #
 
 
-def describe(processor):
-    processor.setDescription("Processor used for testing in ExecutePythonProcessorTests.cpp")
-
-
-def onTrigger(context, session):
-    flow_file = session.get()
-    log.info('Vrrm, vrrrm, processor is running, vrrrm!!')
-
-    if flow_file is not None:
-        log.info('created flow file: %s' % flow_file.getAttribute('filename'))
+def foo():
+    return "foo"
diff --git a/extensions/script/tests/test_scripts/non_transferring_processor.py b/extensions/script/tests/test_python_scripts/non_transferring_processor.py
similarity index 100%
rename from extensions/script/tests/test_scripts/non_transferring_processor.py
rename to extensions/script/tests/test_python_scripts/non_transferring_processor.py
diff --git a/extensions/script/tests/test_scripts/passthrough_processor_transfering_to_failure.py b/extensions/script/tests/test_python_scripts/passthrough_processor_transfering_to_failure.py
similarity index 100%
rename from extensions/script/tests/test_scripts/passthrough_processor_transfering_to_failure.py
rename to extensions/script/tests/test_python_scripts/passthrough_processor_transfering_to_failure.py
diff --git a/extensions/script/tests/test_scripts/passthrough_processor_transfering_to_success.py b/extensions/script/tests/test_python_scripts/passthrough_processor_transfering_to_success.py
similarity index 100%
rename from extensions/script/tests/test_scripts/passthrough_processor_transfering_to_success.py
rename to extensions/script/tests/test_python_scripts/passthrough_processor_transfering_to_success.py
diff --git a/extensions/script/tests/test_scripts/stateful_processor.py b/extensions/script/tests/test_python_scripts/stateful_processor.py
similarity index 100%
rename from extensions/script/tests/test_scripts/stateful_processor.py
rename to extensions/script/tests/test_python_scripts/stateful_processor.py
diff --git a/libminifi/include/utils/ProcessorConfigUtils.h b/libminifi/include/utils/ProcessorConfigUtils.h
index 9f28de8..03b10ad 100644
--- a/libminifi/include/utils/ProcessorConfigUtils.h
+++ b/libminifi/include/utils/ProcessorConfigUtils.h
@@ -43,7 +43,6 @@ std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessConte
 std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name);
 bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name);
 std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, const std::string& property_name);
-std::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext& context, const std::string& property_name);
 std::string parsePropertyWithAllowableValuesOrThrow(const core::ProcessContext& context, const std::string& property_name, const std::set<std::string>& allowable_values);
 
 template<typename T>
diff --git a/libminifi/src/utils/ProcessorConfigUtils.cpp b/libminifi/src/utils/ProcessorConfigUtils.cpp
index 4bd515a..8c128c3 100644
--- a/libminifi/src/utils/ProcessorConfigUtils.cpp
+++ b/libminifi/src/utils/ProcessorConfigUtils.cpp
@@ -52,14 +52,6 @@ std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext&
   return time_property.getMilliseconds();
 }
 
-std::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext& context, const std::string& property_name) {
-  uint64_t value;
-  if (context.getProperty(property_name, value)) {
-    return { value };
-  }
-  return std::nullopt;
-}
-
 std::string parsePropertyWithAllowableValuesOrThrow(const core::ProcessContext& context, const std::string& property_name, const std::set<std::string>& allowable_values) {
   std::string value;
   if (!context.getProperty(property_name, value)