You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/12/05 12:49:50 UTC
[nifi-minifi-cpp] 03/03: MINIFICPP-1990 Add ProcessSession::remove to Python API
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a3bf25359307cfd5cb9fdbfc0e48de8af494fe43
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Dec 5 13:48:40 2022 +0100
MINIFICPP-1990 Add ProcessSession::remove to Python API
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1462
---
docker/test/integration/features/python.feature | 8 ++++++
docker/test/integration/minifi/core/ImageStore.py | 2 +-
.../minifi/processors/RemoveFlowFile.py | 22 ++++++++++++++++
.../pythonprocessors/examples/RemoveFlowFile.py | 29 ++++++++++++++++++++++
extensions/script/python/PyProcessSession.cpp | 14 +++++++++++
extensions/script/python/PyProcessSession.h | 1 +
extensions/script/python/PythonBindings.h | 3 ++-
libminifi/src/core/ProcessSession.cpp | 1 +
8 files changed, 78 insertions(+), 2 deletions(-)
diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature
index 5f1c72a34..341199c43 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -46,3 +46,11 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up
Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds
+
+ Scenario: FlowFile can be removed from session
+ Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ And a RemoveFlowFile processor
+
+ When all instances start up
+ Then the Minifi logs contain the following message: "Removing flow file with UUID" in less than 30 seconds
+
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 2700c89d0..c0c158852 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -106,7 +106,7 @@ class ImageStore:
echo "UserName = postgres" >> /etc/odbc.ini && \
echo "Password = password" >> /etc/odbc.ini && \
echo "Database = postgres" >> /etc/odbc.ini
- RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties
+ RUN sed -i -e 's/INFO/TRACE/g' {minifi_root}/conf/minifi-log.properties
RUN echo nifi.flow.engine.threads=5 >> {minifi_root}/conf/minifi.properties
RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
diff --git a/docker/test/integration/minifi/processors/RemoveFlowFile.py b/docker/test/integration/minifi/processors/RemoveFlowFile.py
new file mode 100644
index 000000000..3fc9b4da2
--- /dev/null
+++ b/docker/test/integration/minifi/processors/RemoveFlowFile.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class RemoveFlowFile(Processor):
+ def __init__(self):
+ super(RemoveFlowFile, self).__init__('RemoveFlowFile', class_prefix='org.apache.nifi.minifi.processors.examples.')
diff --git a/extensions/pythonprocessors/examples/RemoveFlowFile.py b/extensions/pythonprocessors/examples/RemoveFlowFile.py
new file mode 100644
index 000000000..5235636b4
--- /dev/null
+++ b/extensions/pythonprocessors/examples/RemoveFlowFile.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def describe(processor):
+ processor.setDescription("Removes flow file from the session")
+
+
+def onInitialize(processor):
+ processor.setSupportsDynamicProperties()
+
+
+def onTrigger(context, session):
+ flow_file = session.get()
+ if flow_file is not None:
+ session.remove(flow_file)
diff --git a/extensions/script/python/PyProcessSession.cpp b/extensions/script/python/PyProcessSession.cpp
index 843d00323..f13a3c162 100644
--- a/extensions/script/python/PyProcessSession.cpp
+++ b/extensions/script/python/PyProcessSession.cpp
@@ -135,4 +135,18 @@ void PyProcessSession::releaseCoreResources() {
session_.reset();
}
+void PyProcessSession::remove(const std::shared_ptr<script::ScriptFlowFile>& script_flow_file) {
+ if (!session_) {
+ throw std::runtime_error("Access of ProcessSession after it has been released");
+ }
+
+ auto flow_file = script_flow_file->getFlowFile();
+
+ if (!flow_file) {
+ throw std::runtime_error("Access of FlowFile after it has been released");
+ }
+
+ session_->remove(flow_file);
+}
+
} // namespace org::apache::nifi::minifi::python
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 898719854..2eea2bb58 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -46,6 +46,7 @@ class PyProcessSession {
void transfer(const std::shared_ptr<script::ScriptFlowFile>& flow_file, const core::Relationship& relationship);
void read(const std::shared_ptr<script::ScriptFlowFile>& flow_file, py::object input_stream_callback);
void write(const std::shared_ptr<script::ScriptFlowFile>& flow_file, py::object output_stream_callback);
+ void remove(const std::shared_ptr<script::ScriptFlowFile>& flow_file);
/**
* Sometimes we want to release shared pointers to core resources when
diff --git a/extensions/script/python/PythonBindings.h b/extensions/script/python/PythonBindings.h
index a406f02f8..004f6f415 100644
--- a/extensions/script/python/PythonBindings.h
+++ b/extensions/script/python/PythonBindings.h
@@ -54,7 +54,8 @@ PYBIND11_EMBEDDED_MODULE(minifi_native, m) { // NOLINT
static_cast<std::shared_ptr<script::ScriptFlowFile> (python::PyProcessSession::*)(const std::shared_ptr<script::ScriptFlowFile>&)>(&python::PyProcessSession::create))
.def("read", &python::PyProcessSession::read)
.def("write", &python::PyProcessSession::write)
- .def("transfer", &python::PyProcessSession::transfer);
+ .def("transfer", &python::PyProcessSession::transfer)
+ .def("remove", &python::PyProcessSession::remove);
py::class_<python::PythonProcessor, std::shared_ptr<python::PythonProcessor>>(m, "Processor")
.def("setSupportsDynamicProperties", &python::PythonProcessor::setSupportsDynamicProperties)
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 0181f1e36..1d06b8da7 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -195,6 +195,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
}
void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
+ logger_->log_trace("Removing flow file with UUID: %s", flow->getUUIDStr());
flow->setDeleted(true);
deleted_flowfiles_.push_back(flow);
std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();