You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/08/25 15:52:05 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1355 Fix
ExecutePythonProcessor
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 8bcd558 MINIFICPP-1355 Fix ExecutePythonProcessor
8bcd558 is described below
commit 8bcd558030ffbbf34c3b7b382296e2fceb698f24
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Aug 25 17:47:05 2021 +0200
MINIFICPP-1355 Fix ExecutePythonProcessor
and enable scripting in docker CI
Closes #1126
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.github/workflows/ci.yml | 2 +-
.../integration/MiNiFi_integration_test_driver.py | 4 ++
docker/test/integration/features/python.feature | 13 ++++
.../minifi/core/DockerTestDirectoryBindings.py | 1 +
.../minifi/processors/ExecutePythonProcessor.py | 8 +++
.../resources/python/add_attribute_to_flowfile.py | 13 ++++
docker/test/integration/steps/steps.py | 5 ++
.../script/python/ExecutePythonProcessor.cpp | 82 ++++++----------------
extensions/script/python/ExecutePythonProcessor.h | 3 -
.../script-tests/ExecutePythonProcessorTests.cpp | 33 ++++-----
10 files changed, 78 insertions(+), 86 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 79dae16..4862f5c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -275,7 +275,7 @@ jobs:
if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
mkdir build
cd build
- cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DDISABLE_SCRIPTING=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+ cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
make docker
- id: install_deps
run: |
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index dd086c0..ec721ba 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -285,3 +285,7 @@ class MiNiFi_integration_test():
if cluster.get_engine() == "minifi-cpp":
line_found = cluster.wait_for_app_logs(line, 60)
assert line_found
+
+ def check_minifi_logs_for_message(self, cluster_name, log_message, timeout_seconds):
+ cluster = self.acquire_cluster(cluster_name)
+ assert cluster.wait_for_app_logs(log_message, timeout_seconds)
diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature
new file mode 100644
index 0000000..4742a23
--- /dev/null
+++ b/docker/test/integration/features/python.feature
@@ -0,0 +1,13 @@
+Feature: MiNiFi can use python processors in its flows
+ Background:
+ Given the content of "/tmp/output" is monitored
+
+ Scenario: A MiNiFi instance can update attributes through custom python processor
+ Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ And a ExecutePythonProcessor processor with the "Script File" property set to "/tmp/resources/python/add_attribute_to_flowfile.py"
+ And a LogAttribute processor in the "primary_cluster" flow
+ And the "success" relationship of the GenerateFlowFile processor is connected to the ExecutePythonProcessor
+ And the "success" relationship of the ExecutePythonProcessor processor is connected to the LogAttribute
+
+ When all instances start up
+ Then the Minifi logs in the "primary_cluster" contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index c66e62f..5ca3ecd 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -22,6 +22,7 @@ class DockerTestDirectoryBindings:
# Add resources
test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
+ shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
def get_data_directories(self, test_id):
return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/processors/ExecutePythonProcessor.py b/docker/test/integration/minifi/processors/ExecutePythonProcessor.py
new file mode 100644
index 0000000..9e3aab2
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ExecutePythonProcessor.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+
+class ExecutePythonProcessor(Processor):
+ def __init__(self):
+ super(ExecutePythonProcessor, self).__init__(
+ 'ExecutePythonProcessor',
+ auto_terminate=['success'])
diff --git a/docker/test/integration/resources/python/add_attribute_to_flowfile.py b/docker/test/integration/resources/python/add_attribute_to_flowfile.py
new file mode 100644
index 0000000..0ab4724
--- /dev/null
+++ b/docker/test/integration/resources/python/add_attribute_to_flowfile.py
@@ -0,0 +1,13 @@
+def describe(processor):
+ processor.setDescription("Adds an attribute to your flow files")
+
+
+def onInitialize(processor):
+ processor.setSupportsDynamicProperties()
+
+
+def onTrigger(context, session):
+ flow_file = session.get()
+ if flow_file is not None:
+ flow_file.addAttribute("Python attribute", "attributevalue")
+ session.transfer(flow_file, REL_SUCCESS)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 3c9aa24..8804bf6 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -594,3 +594,8 @@ def step_impl(context, cluster_name, query, number_of_rows, timeout_seconds):
@then("the minifi log contains \"{line}\"")
def step_impl(context, line):
context.test.check_minifi_log_contents(line)
+
+
+@then("the Minifi logs in the \"{cluster_name}\" contain the following message: \"{log_message}\" in less than {duration}")
+def step_impl(context, cluster_name, log_message, duration):
+ context.test.check_minifi_logs_for_message(cluster_name, log_message, timeparse(duration))
diff --git a/extensions/script/python/ExecutePythonProcessor.cpp b/extensions/script/python/ExecutePythonProcessor.cpp
index ed65f93..a2eac7f 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -53,79 +53,39 @@ core::Relationship ExecutePythonProcessor::Success("success", "Script successes"
core::Relationship ExecutePythonProcessor::Failure("failure", "Script failures");
void ExecutePythonProcessor::initialize() {
- // initialization requires that we do a little leg work prior to onSchedule
- // so that we can provide manifest our processor identity
- if (getProperties().empty()) {
- setSupportedProperties({
- ScriptFile,
- ScriptBody,
- ModuleDirectory
- });
- setAcceptAllProperties();
- setSupportedRelationships({
- Success,
- Failure
- });
- valid_init_ = false;
- return;
- }
+ setSupportedProperties({
+ ScriptFile,
+ ScriptBody,
+ ModuleDirectory
+ });
+ setAcceptAllProperties();
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+void ExecutePythonProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
python_logger_ = logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
getProperty(ModuleDirectory.getName(), module_directory_);
- valid_init_ = false;
appendPathForImportModules();
loadScript();
- try {
- if (script_to_exec_.size()) {
- std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
- engine->eval(script_to_exec_);
- auto shared_this = shared_from_this();
- engine->describe(shared_this);
- engine->onInitialize(shared_this);
- handleEngineNoLongerInUse(std::move(engine));
- valid_init_ = true;
- }
- }
- catch (const std::exception& exception) {
- logger_->log_error("Caught Exception: %s", exception.what());
- std::rethrow_exception(std::current_exception());
- }
- catch (...) {
- logger_->log_error("Caught Exception");
- std::rethrow_exception(std::current_exception());
- }
-}
-void ExecutePythonProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
- if (!valid_init_) {
- throw std::runtime_error("Could not correctly initialize " + getName());
- }
- try {
- reloadScriptIfUsingScriptFileProperty();
- if (script_to_exec_.empty()) {
- throw std::runtime_error("Neither Script Body nor Script File is available to execute");
- }
- std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
-
- engine->eval(script_to_exec_);
- engine->onSchedule(context);
-
- handleEngineNoLongerInUse(std::move(engine));
- }
- catch (const std::exception& exception) {
- logger_->log_error("Caught Exception: %s", exception.what());
- }
- catch (...) {
- logger_->log_error("Caught Exception");
+ if (script_to_exec_.empty()) {
+ throw std::runtime_error("Neither Script Body nor Script File is available to execute");
}
+ std::shared_ptr<python::PythonScriptEngine> engine = getScriptEngine();
+ engine->eval(script_to_exec_);
+ auto shared_this = shared_from_this();
+ engine->describe(shared_this);
+ engine->onInitialize(shared_this);
+ engine->onSchedule(context);
+ handleEngineNoLongerInUse(std::move(engine));
}
void ExecutePythonProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- if (!valid_init_) {
- throw std::runtime_error("Could not correctly initialize " + getName());
- }
try {
// TODO(hunyadi): When using "Script File" property, we currently re-read the script file content every time the processor is triggered. This should change to single-read when we release 1.0.0
// https://issues.apache.org/jira/browse/MINIFICPP-1223
diff --git a/extensions/script/python/ExecutePythonProcessor.h b/extensions/script/python/ExecutePythonProcessor.h
index 0057b6f..3e50f8d 100644
--- a/extensions/script/python/ExecutePythonProcessor.h
+++ b/extensions/script/python/ExecutePythonProcessor.h
@@ -48,7 +48,6 @@ class ExecutePythonProcessor : public core::Processor {
explicit ExecutePythonProcessor(const std::string &name, const utils::Identifier &uuid = {})
: Processor(name, uuid),
python_dynamic_(false),
- valid_init_(false),
logger_(logging::LoggerFactory<ExecutePythonProcessor>::getLogger()),
script_engine_q_() {
}
@@ -100,8 +99,6 @@ class ExecutePythonProcessor : public core::Processor {
bool python_dynamic_;
- bool valid_init_;
-
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<logging::Logger> python_logger_;
diff --git a/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp b/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
index c4692a9..ecb3f8c 100644
--- a/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
+++ b/libminifi/test/script-tests/ExecutePythonProcessorTests.cpp
@@ -78,8 +78,7 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
public:
enum class Expectation {
OUTPUT_FILE_MATCHES_INPUT,
- RUNTIME_RELATIONSHIP_EXCEPTION,
- PROCESSOR_INITIALIZATION_EXCEPTION
+ RUNTIME_RELATIONSHIP_EXCEPTION
};
SimplePythonFlowFileTransferTest() : ExecutePythonProcessorTestBase{} {}
@@ -90,11 +89,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
const std::string input_dir = testController_->createTempDirectory();
putFileToDir(input_dir, TEST_FILE_NAME, TEST_FILE_CONTENT);
addGetFileProcessorToPlan(input_dir);
-
- if (Expectation::PROCESSOR_INITIALIZATION_EXCEPTION == expectation) {
- REQUIRE_THROWS(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body));
- return;
- }
REQUIRE_NOTHROW(addExecutePythonProcessorToPlan(used_as_script_file, used_as_script_body));
const std::string output_dir = testController_->createTempDirectory();
@@ -121,7 +115,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
auto executePythonProcessor = plan_->addProcessor("ExecutePythonProcessor", "executePythonProcessor");
plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptFile.getName(), getScriptFullPath("stateful_processor.py"));
- executePythonProcessor->initialize();
addPutFileProcessorToPlan(core::Relationship("success", "description"), output_dir);
plan_->runNextProcessor(); // ExecutePythonProcessor
@@ -154,7 +147,6 @@ class SimplePythonFlowFileTransferTest : public ExecutePythonProcessorTestBase {
if ("" != used_as_script_body) {
plan_->setProperty(executePythonProcessor, org::apache::nifi::minifi::python::processors::ExecutePythonProcessor::ScriptBody.getName(), getFileContent(getScriptFullPath(used_as_script_body)));
}
- executePythonProcessor->initialize();
return executePythonProcessor;
}
@@ -179,7 +171,6 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[
// Expectations
const auto OUTPUT_FILE_MATCHES_INPUT = SimplePythonFlowFileTransferTest::Expectation::OUTPUT_FILE_MATCHES_INPUT;
const auto RUNTIME_RELATIONSHIP_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::RUNTIME_RELATIONSHIP_EXCEPTION;
- const auto PROCESSOR_INITIALIZATION_EXCEPTION = SimplePythonFlowFileTransferTest::Expectation::PROCESSOR_INITIALIZATION_EXCEPTION;
// ExecutePython outbound relationships
const core::Relationship SUCCESS {"success", "description"};
const core::Relationship FAILURE{"failure", "description"};
@@ -187,11 +178,11 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[
// For the tests "" is treated as none-provided since no optional implementation was ported to the project yet
// 0. Neither valid script file nor script body provided
- // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY
- testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "", ""); // NOLINT
- testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, "", ""); // NOLINT
- testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "non_existent_script.py", ""); // NOLINT
- testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, FAILURE, "non_existent_script.py", ""); // NOLINT
+ // TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", ""); // NOLINT
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", ""); // NOLINT
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "non_existent_script.py", ""); // NOLINT
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "non_existent_script.py", ""); // NOLINT
// 1. Using script file as attribute
// TEST EXPECTATION OUT_REL USE_AS_SCRIPT_FILE USE_AS_SCRIPT_BODY
@@ -203,15 +194,15 @@ TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Simple file passthrough", "[
// 2. Using script body as attribute
// TEST EXPECTATION OUT_REL SCRIPT_FILE USE_AS_SCRIPT_BODY
- testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py"); // NOLINT
- testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py"); // NOLINT
- testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT
- testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT
+ testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, SUCCESS, "", "passthrough_processor_transfering_to_success.py"); // NOLINT
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "passthrough_processor_transfering_to_success.py"); // NOLINT
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT
+ testSimpleFilePassthrough( OUTPUT_FILE_MATCHES_INPUT, FAILURE, "", "passthrough_processor_transfering_to_failure.py"); // NOLINT
testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, FAILURE, "", "non_transferring_processor.py"); // NOLINT
// 3. Setting both attributes
- // TEST EXPECTATION OUT_REL SCRIPT_FILE USE_AS_SCRIPT_BODY
- testSimpleFilePassthrough(PROCESSOR_INITIALIZATION_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_success.py", "passthrough_processor_transfering_to_success.py"); // NOLINT
+ // TEST EXPECTATION OUT_REL SCRIPT_FILE USE_AS_SCRIPT_BODY
+ testSimpleFilePassthrough(RUNTIME_RELATIONSHIP_EXCEPTION, SUCCESS, "passthrough_processor_transfering_to_success.py", "passthrough_processor_transfering_to_success.py"); // NOLINT
}
TEST_CASE_METHOD(SimplePythonFlowFileTransferTest, "Stateful execution", "[executePythonProcessorStateful]") {