You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/08/25 15:52:05 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1355 Fix ExecutePythonProcessor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bcd558  MINIFICPP-1355 Fix ExecutePythonProcessor
8bcd558 is described below

commit 8bcd558030ffbbf34c3b7b382296e2fceb698f24
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Aug 25 17:47:05 2021 +0200

    MINIFICPP-1355 Fix ExecutePythonProcessor
    
    and enable scripting in docker CI
    
    Closes #1126
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .github/workflows/ci.yml                           |  2 +-
 .../integration/MiNiFi_integration_test_driver.py  |  4 ++
 docker/test/integration/features/python.feature    | 13 ++++
 .../minifi/core/DockerTestDirectoryBindings.py     |  1 +
 .../minifi/processors/ExecutePythonProcessor.py    |  8 +++
 .../resources/python/add_attribute_to_flowfile.py  | 13 ++++
 docker/test/integration/steps/steps.py             |  5 ++
 .../script/python/ExecutePythonProcessor.cpp       | 82 ++++++----------------
 extensions/script/python/ExecutePythonProcessor.h  |  3 -
 .../script-tests/ExecutePythonProcessorTests.cpp   | 33 ++++-----
 10 files changed, 78 insertions(+), 86 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 79dae16..4862f5c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -275,7 +275,7 @@ jobs:
           if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
           mkdir build
           cd build
-          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DDISABLE_SCRIPTING=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
           make docker
       - id: install_deps
         run: |
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index dd086c0..ec721ba 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -285,3 +285,7 @@ class MiNiFi_integration_test():
             if cluster.get_engine() == "minifi-cpp":
                 line_found = cluster.wait_for_app_logs(line, 60)
         assert line_found
+
+    def check_minifi_logs_for_message(self, cluster_name, log_message, timeout_seconds):
+        cluster = self.acquire_cluster(cluster_name)
+        assert cluster.wait_for_app_logs(log_message, timeout_seconds)
diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature
new file mode 100644
index 0000000..4742a23
--- /dev/null
+++ b/docker/test/integration/features/python.feature
@@ -0,0 +1,13 @@
+Feature: MiNiFi can use python processors in its flows
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance can update attributes through custom python processor
+    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    And a ExecutePythonProcessor processor with the "Script File" property set to "/tmp/resources/python/add_attribute_to_flowfile.py"
+    And a LogAttribute processor in the "primary_cluster" flow
+    And the "success" relationship of the GenerateFlowFile processor is connected to the ExecutePythonProcessor
+    And the "success" relationship of the ExecutePythonProcessor processor is connected to the LogAttribute
+
+    When all instances start up
+    Then the Minifi logs in the "primary_cluster" contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index c66e62f..5ca3ecd 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -22,6 +22,7 @@ class DockerTestDirectoryBindings:
         # Add resources
         test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on DockerVerify.sh
         shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
+        shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/processors/ExecutePythonProcessor.py b/docker/test/integration/minifi/processors/ExecutePythonProcessor.py
new file mode 100644
index 0000000..9e3aab2
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ExecutePythonProcessor.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+
+class ExecutePythonProcessor(Processor):
+    def __init__(self):
+        super(ExecutePythonProcessor, self).__init__(
+            'ExecutePythonProcessor',
+            auto_terminate=['success'])
diff --git a/docker/test/integration/resources/python/add_attribute_to_flowfile.py b/docker/test/integration/resources/python/add_attribute_to_flowfile.py
new file mode 100644
index 0000000..0ab4724
--- /dev/null
+++ b/docker/test/integration/resources/python/add_attribute_to_flowfile.py
@@ -0,0 +1,13 @@
+def describe(processor):
+    processor.setDescription("Adds an attribute to your flow files")
+
+
+def onInitialize(processor):
+    processor.setSupportsDynamicProperties()
+
+
+def onTrigger(context, session):
+    flow_file = session.get()
+    if flow_file is not None:
+        flow_file.addAttribute("Python attribute", "attributevalue")
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 3c9aa24..8804bf6 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -594,3 +594,8 @@ def step_impl(context, cluster_name, query, number_of_rows, timeout_seconds):
 @then("the minifi log contains \"{line}\"")
 def step_impl(context, line):
     context.test.check_minifi_log_contents(line)
+
+
+@then("the Minifi logs in the \"{cluster_name}\" contain the following message: \"{log_message}\" in less than {duration}")
+def step_impl(context, cluster_name, log_message, duration):
+    context.test.check_minifi_logs_for_message(cluster_name, log_message, timeparse(duration))
diff --git a/extensions/script/python/ExecutePythonProcessor.cpp b/extensions/script/python/ExecutePythonProcessor.cpp
index ed65f93..a2eac7f 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -53,79 +53,39 @@ core::Relationship ExecutePythonProcessor::Success("success", "Script successes"
 core::Relationship ExecutePythonProcessor::Failure("failure", "Script failures");
 
 void ExecutePythonProcessor::initialize() {
-  // initialization requires that we do a little leg work prior to onSchedule
-  // so that we can provide manifest our processor identity
-  if (getProperties().empty()) {
-    setSupportedProperties({
-      ScriptFile,
-      ScriptBody,
-      ModuleDirectory
-    });
-    setAcceptAllProperties();
-    setSupportedRelationships({
-      Success,
-      Failure
-    });
-    valid_init_ = false;
-    return;
-  }
+  setSupportedProperties({
+    ScriptFile,
+    ScriptBody,
+    ModuleDirectory
+  });
+  setAcceptAllProperties();
+  setSupportedRelationships({
+    Success,
+    Failure
+  });
+}
 
+void ExecutePythonProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
   python_logger_ = logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
 
   getProperty(ModuleDirectory.getName(), module_directory_);
 
-  valid_init_ = false;
   appendPathForImportModules();
   loadScript();
-  try {
-    if (script_to_exec_.size()) {
-      std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
-      engine->eval(script_to_exec_);
-      auto shared_this = shared_from_this();
-      engine->describe(shared_this);
-      engine->onInitialize(shared_this);
-      handleEngineNoLongerInUse(std::move(engine));
-      valid_init_ = true;
-    }
-  }
-  catch (const std::exception& exception) {
-    logger_->log_error("Caught Exception: %s", exception.what());
-    std::rethrow_exception(std::current_exception());
-  }
-  catch (...) {
-    logger_->log_error("Caught Exception");
-    std::rethrow_exception(std::current_exception());
-  }
-}
 
-void ExecutePythonProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
-  if (!valid_init_) {
-    throw std::runtime_error("Could not correctly initialize " + getName());
-  }
-  try {
-    reloadScriptIfUsingScriptFileProperty();
-    if (script_to_exec_.empty()) {
-      throw std::runtime_error("Neither Script Body nor Script File is available to execute");
-    }
-    std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
-
-    engine->eval(script_to_exec_);
-    engine->onSchedule(context);
-
-    handleEngineNoLongerInUse(std::move(engine));
-  }
-  catch (const std::exception& exception) {
-    logger_->log_error("Caught Exception: %s", exception.what());
-  }
-  catch (...) {
-    logger_->log_error("Caught Exception");
+  if (script_to_exec_.empty()) {
+    throw std::runtime_error("Neither Script Body nor Script File is available to execute");
   }
+  std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+  engine->eval(script_to_exec_);
+  auto shared_this = shared_from_this();
+  engine->describe(shared_this);
+  engine->onInitialize(shared_this);
+  engine->onSchedule(context);
+  handleEngineNoLongerInUse(std::move(engine));
 }
 
 void ExecutePythonProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!valid_init_) {
-    throw std::runtime_error("Could not correctly initialize " + getName());
-  }
   try {
     // TODO(hunyadi): When using "Script File" property, we currently re-read the script file content every time the processor is triggered. This should change to single-read when we release 1.0.0
     // https://issues.apache.org/jira/browse/MINIFICPP-1223
diff --git a/extensions/script/python/ExecutePythonProcessor.h b/extensions/script/python/ExecutePythonProcessor.h
index 0057b6f..3e50f8d 100644
--- a/extensions/script/python/ExecutePythonProcessor.h
+++ b/extensions/script/python/ExecutePythonProcessor.h
@@ -48,7 +48,6 @@ class ExecutePythonProcessor : public core::Processor {
   explicit ExecutePythonProcessor(const std::string &name, const utils::Identifier &uuid = {})
       : Processor(name, uuid),
         python_dynamic_(false),
-        valid_init_(false),
         logger_(logging::LoggerFactory<ExecutePythonProcessor>::getLogger()),
         script_engine_q_() {
   }
@@ -100,8 +99,6 @@ class ExecutePythonProcessor : public core::Processor {
 
   bool python_dynamic_;
 
-  bool valid_init_;
-
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<logging::Logger> python_logger_;
 
diff --git a/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp b/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
index c4692a9..ecb3f8c 100644
--- a/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
+++ b/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
@@ -78,8 +78,7 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
  public:
   enum class Expectation {
     OUTPUT_FILE_MATCHES_INPUT,
-    RUNTIME_RELATIONSHIP_EXCEPTION,
-    PROCESSOR_INITIALIZATION_EXCEPTION
+    RUNTIME_RELATIONSHIP_EXCEPTION
   };
   SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
 
@@ -90,11 +89,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
     const std::string input_dir = testController_->createTempDirectory();
     putFileToDir(input_dir, TEST_FILE_NAME, TEST_FILE_CONTENT);
     addGetFileProcessorToPlan(input_dir);
-
-    if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
-      REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body));
-      return;
-    }
     REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body));
 
     const std::string output_dir = testController_->createTempDirectory();
@@ -121,7 +115,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
 
     auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
     plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(), getScriptFullPath("stateful_processor.py"));
-    executePythonProcessor->initialize();
 
     addPutFileProcessorToPlan(core::Relationship("success", "description"), output_dir);
     plan_->runNextProcessor();  // ExecutePythonProcessor
@@ -154,7 +147,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
     if ("" != used_as_script_body) {
         plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(), getFileContent(getScriptFullPath(used_as_script_body)));
     }
-    executePythonProcessor->initialize();
     return executePythonProcessor;
   }
 
@@ -179,7 +171,6 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[
   // Expectations
   const auto OUTPUT_FILE_MATCHES_INPUT          = SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT;
   const auto RUNTIME_RELATIONSHIP_EXCEPTION    = SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION;
-  const auto PROCESSOR_INITIALIZATION_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION;
   // ExecutePython outbound relationships
   const core::Relationship SUCCESS {"success", "description"};
   const core::Relationship FAILURE{"failure", "description"};
@@ -187,11 +178,11 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[
   // For the tests "" is treated as none-provided since no optional implementation was ported to the project yet
 
   // 0. Neither valid script file nor script body provided
-  //                                          TEST EXPECTATION  OUT_REL        USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
-  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS,                       "", "");  // NOLINT
-  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE,                       "", "");  // NOLINT
-  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "non_existent_script.py", "");  // NOLINT
-  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, "non_existent_script.py", "");  // NOLINT
+  //                                      TEST EXPECTATION  OUT_REL        USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS,                       "", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE,                       "", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "non_existent_script.py", "");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "non_existent_script.py", "");  // NOLINT
 
   // 1. Using script file as attribute
   //                                      TEST EXPECTATION  OUT_REL                                 USE_AS_SCRIPT_FILE  USE_AS_SCRIPT_BODY
@@ -203,15 +194,15 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[
 
   // 2. Using script body as attribute
   //                                      TEST EXPECTATION  OUT_REL  SCRIPT_FILE                        USE_AS_SCRIPT_BODY
-  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py");  // NOLINT 
-  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py");  // NOLINT 
-  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py");  // NOLINT 
-  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py");  // NOLINT 
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py");  // NOLINT
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py");  // NOLINT
+  testSimpleFilePassthrough(     OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py");  // NOLINT
   testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "",                   "non_transferring_processor.py");  // NOLINT
 
   // 3. Setting both attributes
-  //                                          TEST EXPECTATION  OUT_REL                                        SCRIPT_FILE                                 USE_AS_SCRIPT_BODY
-  testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_success.py", "passthrough_processor_transfering_to_success.py");  // NOLINT
+  //                                      TEST EXPECTATION  OUT_REL                                        SCRIPT_FILE                                 USE_AS_SCRIPT_BODY
+  testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_success.py", "passthrough_processor_transfering_to_success.py");  // NOLINT
 }
 
 TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Stateful execution", "[executePythonProcessorStateful]") {