You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/01/04 15:28:25 UTC

[nifi-minifi-cpp] 05/05: MINIFICPP-1222 Remove engine queue from ExecutePythonProcessor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4f0dcd838f5afe46545aee2810d16308a9b5cfc5
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Jan 4 16:23:06 2022 +0100

    MINIFICPP-1222 Remove engine queue from ExecutePythonProcessor
    
    Originally the Jira ticket was about reowrking the engine queue to only allow the maximum concurrent tasks number of script engines to run at the same time. The problem with that was that even if the script engines were executed concurrently they use the same python interpreter with the global interpreter lock in place so no concurrency can take place.
    
    The only possibility for concurrency to use sub-interpreters which would be also problematic. First of all pybind11 does not support an interface for sub-interpreters, it is only available through the CPython API, but the documentation also warns about the caveats of the sub-interpreters here: https://pybind11.readthedocs.io/en/stable/advanced/embedding.html
    Secondly using the CPython API for sub-interpreters would require us to always know which thread we are running on and create separate sub-interpreters and thread states for each thread (in this case it would possibly not even allow us to run the python scripts from the main thread anymore). Managing the thread states and the sub-interpreters would be cumbersome in our architecture.
    
    Because all of the above problems I removed the python script queue from the ExecutePythonProcessor which will run the python scripts on a single thread.
    
    Closes #1227
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../script/python/ExecutePythonProcessor.cpp       | 75 ++++++++--------------
 extensions/script/python/ExecutePythonProcessor.h  | 30 +++------
 2 files changed, 37 insertions(+), 68 deletions(-)

diff --git a/extensions/script/python/ExecutePythonProcessor.cpp b/extensions/script/python/ExecutePythonProcessor.cpp
index 62e8c23..ef09a4e 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -79,8 +79,6 @@ void ExecutePythonProcessor::initialize() {
     return;
   }
 
-  python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
-
   getProperty(ModuleDirectory.getName(), module_directory_);
   appendPathForImportModules();
 
@@ -93,74 +91,44 @@ void ExecutePythonProcessor::initialize() {
 
   // In case of native python processors we require initialization before onSchedule
   // so that we can provide manifest of processor identity on C2
-  auto engine = getScriptEngine();
-  initalizeThroughScriptEngine(*engine);
-  handleEngineNoLongerInUse(std::move(engine));
+  python_script_engine_ = createScriptEngine();
+  initalizeThroughScriptEngine();
 }
 
-void ExecutePythonProcessor::initalizeThroughScriptEngine(python::PythonScriptEngine& engine) {
-  engine.eval(script_to_exec_);
+void ExecutePythonProcessor::initalizeThroughScriptEngine() {
+  python_script_engine_->eval(script_to_exec_);
   auto shared_this = shared_from_this();
-  engine.describe(shared_this);
-  engine.onInitialize(shared_this);
+  python_script_engine_->describe(shared_this);
+  python_script_engine_->onInitialize(shared_this);
   processor_initialized_ = true;
 }
 
 void ExecutePythonProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
-  std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
   if (!processor_initialized_) {
     loadScript();
-    initalizeThroughScriptEngine(*engine);
+    python_script_engine_ = createScriptEngine();
+    initalizeThroughScriptEngine();
   } else {
-    reloadScriptIfUsingScriptFileProperty(*engine);
+    reloadScriptIfUsingScriptFileProperty();
     if (script_to_exec_.empty()) {
       throw std::runtime_error("Neither Script Body nor Script File is available to execute");
     }
   }
 
-  engine->eval(script_to_exec_);
-  engine->onSchedule(context);
-
-  handleEngineNoLongerInUse(std::move(engine));
+  gsl_Expects(python_script_engine_);
+  python_script_engine_->eval(script_to_exec_);
+  python_script_engine_->onSchedule(context);
 
   getProperty(ReloadOnScriptChange.getName(), reload_on_script_change_);
 }
 
 void ExecutePythonProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  auto engine = getScriptEngine();
-  reloadScriptIfUsingScriptFileProperty(*engine);
+  reloadScriptIfUsingScriptFileProperty();
   if (script_to_exec_.empty()) {
     throw std::runtime_error("Neither Script Body nor Script File is available to execute");
   }
 
-  engine->onTrigger(context, session);
-  handleEngineNoLongerInUse(std::move(engine));
-}
-
-// TODO(hunyadi): This is potentially not what we want. See https://issues.apache.org/jira/browse/MINIFICPP-1222
-std::shared_ptr<python::PythonScriptEngine> ExecutePythonProcessor::getScriptEngine() {
-  std::shared_ptr<python::PythonScriptEngine> engine;
-  // Use an existing engine, if one is available
-  if (script_engine_q_.try_dequeue(engine)) {
-    logger_->log_debug("Using available [%p] script engine instance", engine.get());
-    return engine;
-  }
-  engine = createEngine<python::PythonScriptEngine>();
-  logger_->log_info("Created new [%p] script engine instance. Number of instances: approx. %d / %d.", engine.get(), script_engine_q_.size_approx(), getMaxConcurrentTasks());
-  if (engine == nullptr) {
-    throw std::runtime_error("No script engine available");
-  }
-  return engine;
-}
-
-void ExecutePythonProcessor::handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&& engine) {
-  // Make engine available for use again
-  if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
-    logger_->log_debug("Releasing [%p] script engine", engine.get());
-    script_engine_q_.enqueue(engine);
-  } else {
-    logger_->log_info("Destroying script engine because it is no longer needed");
-  }
+  python_script_engine_->onTrigger(context, session);
 }
 
 void ExecutePythonProcessor::appendPathForImportModules() {
@@ -203,7 +171,7 @@ void ExecutePythonProcessor::loadScript() {
   return;
 }
 
-void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty(python::PythonScriptEngine& engine) {
+void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() {
   if (script_file_path_.empty() || !reload_on_script_change_) {
     return;
   }
@@ -212,10 +180,21 @@ void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty(python::Pytho
     logger_->log_debug("Script file has changed since last time, reloading...");
     loadScriptFromFile();
     last_script_write_time_ = file_write_time;
-    engine.eval(script_to_exec_);
+    python_script_engine_->eval(script_to_exec_);
   }
 }
 
+std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() {
+  auto engine = std::make_unique<PythonScriptEngine>();
+
+  python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
+  engine->bind("log", python_logger_);
+  engine->bind("REL_SUCCESS", Success);
+  engine->bind("REL_FAILURE", Failure);
+
+  return engine;
+}
+
 REGISTER_RESOURCE(
     ExecutePythonProcessor, "Executes a script given the flow file and a process session. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) "
     "as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back.Scripts must define an onTrigger function which accepts NiFi Context"
diff --git a/extensions/script/python/ExecutePythonProcessor.h b/extensions/script/python/ExecutePythonProcessor.h
index 14dc48f..a3904ab 100644
--- a/extensions/script/python/ExecutePythonProcessor.h
+++ b/extensions/script/python/ExecutePythonProcessor.h
@@ -48,8 +48,7 @@ class ExecutePythonProcessor : public core::Processor {
         processor_initialized_(false),
         python_dynamic_(false),
         reload_on_script_change_(true),
-        last_script_write_time_(0),
-        script_engine_q_() {
+        last_script_write_time_(0) {
   }
 
   EXTENSIONAPI static const core::Property ScriptFile;
@@ -93,6 +92,10 @@ class ExecutePythonProcessor : public core::Processor {
     return false;
   }
 
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
  private:
   std::vector<core::Property> python_properties_;
 
@@ -102,35 +105,22 @@ class ExecutePythonProcessor : public core::Processor {
   bool python_dynamic_;
 
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getLogger();
-  std::shared_ptr<core::logging::Logger> python_logger_;
 
   std::string script_to_exec_;
   std::string module_directory_;
   bool reload_on_script_change_;
   uint64_t last_script_write_time_;
   std::string script_file_path_;
+  std::shared_ptr<core::logging::Logger> python_logger_;
+  std::unique_ptr<PythonScriptEngine> python_script_engine_;
 
-  moodycamel::ConcurrentQueue<std::shared_ptr<python::PythonScriptEngine>> script_engine_q_;
-
-  void initalizeThroughScriptEngine(python::PythonScriptEngine& engine);
-  std::shared_ptr<python::PythonScriptEngine> getScriptEngine();
-  void handleEngineNoLongerInUse(std::shared_ptr<python::PythonScriptEngine>&& engine);
   void appendPathForImportModules();
   void loadScriptFromFile();
   void loadScript();
-  void reloadScriptIfUsingScriptFileProperty(python::PythonScriptEngine& engine);
+  void reloadScriptIfUsingScriptFileProperty();
+  void initalizeThroughScriptEngine();
 
-
-  template<typename T>
-  std::shared_ptr<T> createEngine() const {
-    auto engine = std::make_shared<T>();
-
-    engine->bind("log", python_logger_);
-    engine->bind("REL_SUCCESS", Success);
-    engine->bind("REL_FAILURE", Failure);
-
-    return engine;
-  }
+  std::unique_ptr<PythonScriptEngine> createScriptEngine();
 };
 
 } /* namespace processors */