You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/12/05 12:49:50 UTC

[nifi-minifi-cpp] 03/03: MINIFICPP-1990 Add ProcessSession::remove to Python API

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a3bf25359307cfd5cb9fdbfc0e48de8af494fe43
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Dec 5 13:48:40 2022 +0100

    MINIFICPP-1990 Add ProcessSession::remove to Python API
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1462
---
 docker/test/integration/features/python.feature    |  8 ++++++
 docker/test/integration/minifi/core/ImageStore.py  |  2 +-
 .../minifi/processors/RemoveFlowFile.py            | 22 ++++++++++++++++
 .../pythonprocessors/examples/RemoveFlowFile.py    | 29 ++++++++++++++++++++++
 extensions/script/python/PyProcessSession.cpp      | 14 +++++++++++
 extensions/script/python/PyProcessSession.h        |  1 +
 extensions/script/python/PythonBindings.h          |  3 ++-
 libminifi/src/core/ProcessSession.cpp              |  1 +
 8 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature
index 5f1c72a34..341199c43 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -46,3 +46,11 @@ Feature: MiNiFi can use python processors in its flows
 
     When all instances start up
     Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds
+
+  Scenario: FlowFile can be removed from session
+    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    And a RemoveFlowFile processor
+
+    When all instances start up
+    Then the Minifi logs contain the following message: "Removing flow file with UUID" in less than 30 seconds
+
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 2700c89d0..c0c158852 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -106,7 +106,7 @@ class ImageStore:
                     echo "UserName = postgres" >> /etc/odbc.ini && \
                     echo "Password = password" >> /etc/odbc.ini && \
                     echo "Database = postgres" >> /etc/odbc.ini
-                RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties
+                RUN sed -i -e 's/INFO/TRACE/g' {minifi_root}/conf/minifi-log.properties
                 RUN echo nifi.flow.engine.threads=5 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
diff --git a/docker/test/integration/minifi/processors/RemoveFlowFile.py b/docker/test/integration/minifi/processors/RemoveFlowFile.py
new file mode 100644
index 000000000..3fc9b4da2
--- /dev/null
+++ b/docker/test/integration/minifi/processors/RemoveFlowFile.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class RemoveFlowFile(Processor):
+    def __init__(self):
+        super(RemoveFlowFile, self).__init__('RemoveFlowFile', class_prefix='org.apache.nifi.minifi.processors.examples.')
diff --git a/extensions/pythonprocessors/examples/RemoveFlowFile.py b/extensions/pythonprocessors/examples/RemoveFlowFile.py
new file mode 100644
index 000000000..5235636b4
--- /dev/null
+++ b/extensions/pythonprocessors/examples/RemoveFlowFile.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def describe(processor):
+    processor.setDescription("Removes flow file from the session")
+
+
+def onInitialize(processor):
+    processor.setSupportsDynamicProperties()
+
+
+def onTrigger(context, session):
+    flow_file = session.get()
+    if flow_file is not None:
+        session.remove(flow_file)
diff --git a/extensions/script/python/PyProcessSession.cpp b/extensions/script/python/PyProcessSession.cpp
index 843d00323..f13a3c162 100644
--- a/extensions/script/python/PyProcessSession.cpp
+++ b/extensions/script/python/PyProcessSession.cpp
@@ -135,4 +135,18 @@ void PyProcessSession::releaseCoreResources() {
   session_.reset();
 }
 
+void PyProcessSession::remove(const std::shared_ptr<script::ScriptFlowFile>& script_flow_file) {
+  if (!session_) {
+    throw std::runtime_error("Access of ProcessSession after it has been released");
+  }
+
+  auto flow_file = script_flow_file->getFlowFile();
+
+  if (!flow_file) {
+    throw std::runtime_error("Access of FlowFile after it has been released");
+  }
+
+  session_->remove(flow_file);
+}
+
 }  // namespace org::apache::nifi::minifi::python
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 898719854..2eea2bb58 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -46,6 +46,7 @@ class PyProcessSession {
   void transfer(const std::shared_ptr<script::ScriptFlowFile>& flow_file, const core::Relationship& relationship);
   void read(const std::shared_ptr<script::ScriptFlowFile>& flow_file, py::object input_stream_callback);
   void write(const std::shared_ptr<script::ScriptFlowFile>& flow_file, py::object output_stream_callback);
+  void remove(const std::shared_ptr<script::ScriptFlowFile>& flow_file);
 
   /**
    * Sometimes we want to release shared pointers to core resources when
diff --git a/extensions/script/python/PythonBindings.h b/extensions/script/python/PythonBindings.h
index a406f02f8..004f6f415 100644
--- a/extensions/script/python/PythonBindings.h
+++ b/extensions/script/python/PythonBindings.h
@@ -54,7 +54,8 @@ PYBIND11_EMBEDDED_MODULE(minifi_native, m) { // NOLINT
            static_cast<std::shared_ptr<script::ScriptFlowFile> (python::PyProcessSession::*)(const std::shared_ptr<script::ScriptFlowFile>&)>(&python::PyProcessSession::create))
       .def("read", &python::PyProcessSession::read)
       .def("write", &python::PyProcessSession::write)
-      .def("transfer", &python::PyProcessSession::transfer);
+      .def("transfer", &python::PyProcessSession::transfer)
+      .def("remove", &python::PyProcessSession::remove);
 
   py::class_<python::PythonProcessor, std::shared_ptr<python::PythonProcessor>>(m, "Processor")
         .def("setSupportsDynamicProperties", &python::PythonProcessor::setSupportsDynamicProperties)
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 0181f1e36..1d06b8da7 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -195,6 +195,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
 }
 
 void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
+  logger_->log_trace("Removing flow file with UUID: %s", flow->getUUIDStr());
   flow->setDeleted(true);
   deleted_flowfiles_.push_back(flow);
   std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();