You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/12/05 12:49:49 UTC

[nifi-minifi-cpp] 02/03: MINIFICPP-1993 Fix empty return value of PyInputStream

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d16f7455c746538f948db7762ce0af751ad4f4ed
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Dec 5 13:45:55 2022 +0100

    MINIFICPP-1993 Fix empty return value of PyInputStream
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1463
---
 docker/test/integration/features/python.feature    | 10 +++++
 .../minifi/processors/MoveContentToJson.py         | 22 ++++++++++
 .../pythonprocessors/examples/MoveContentToJson.py | 50 ++++++++++++++++++++++
 extensions/script/python/PyInputStream.cpp         |  2 +-
 extensions/script/python/PythonScriptEngine.h      |  2 +-
 5 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature
index 2877853ea..5f1c72a34 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -36,3 +36,13 @@ Feature: MiNiFi can use python processors in its flows
 
     When all instances start up
     Then the Minifi logs contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds
+
+  Scenario: Native python processor can read empty input stream
+    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    And a MoveContentToJson processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GenerateFlowFile processor is connected to the MoveContentToJson
+    And the "success" relationship of the MoveContentToJson processor is connected to the PutFile
+
+    When all instances start up
+    Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/processors/MoveContentToJson.py b/docker/test/integration/minifi/processors/MoveContentToJson.py
new file mode 100644
index 000000000..4d4013ce4
--- /dev/null
+++ b/docker/test/integration/minifi/processors/MoveContentToJson.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class MoveContentToJson(Processor):
+    def __init__(self):
+        super(MoveContentToJson, self).__init__('MoveContentToJson', class_prefix='org.apache.nifi.minifi.processors.examples.')
diff --git a/extensions/pythonprocessors/examples/MoveContentToJson.py b/extensions/pythonprocessors/examples/MoveContentToJson.py
new file mode 100644
index 000000000..1b3068b07
--- /dev/null
+++ b/extensions/pythonprocessors/examples/MoveContentToJson.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import codecs
+import json
+
+
+class ReadCallback:
+    def process(self, input_stream):
+        self.content = codecs.getreader('utf-8')(input_stream).read()
+        return len(self.content)
+
+
+class WriteToJsonCallback:
+    def __init__(self, content):
+        self.content = content
+
+    def process(self, output_stream):
+        json_content = json.dumps({"content": self.content})
+        output_stream.write(json_content.encode('utf-8'))
+        return len(json_content)
+
+
+def describe(processor):
+    processor.setDescription("Moves content of flow file to JSON file under 'content' key")
+
+
+def onInitialize(processor):
+    processor.setSupportsDynamicProperties()
+
+
+def onTrigger(context, session):
+    flow_file = session.get()
+    if flow_file is not None:
+        read_callback = ReadCallback()
+        session.read(flow_file, read_callback)
+        session.write(flow_file, WriteToJsonCallback(read_callback.content))
+        session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/script/python/PyInputStream.cpp b/extensions/script/python/PyInputStream.cpp
index 1bc74ceb3..9e4951f9c 100644
--- a/extensions/script/python/PyInputStream.cpp
+++ b/extensions/script/python/PyInputStream.cpp
@@ -41,7 +41,7 @@ py::bytes PyInputStream::read(size_t len) {
   }
 
   if (len <= 0) {
-    return nullptr;
+    return {};
   }
 
   std::vector<std::byte> buffer(len);
diff --git a/extensions/script/python/PythonScriptEngine.h b/extensions/script/python/PythonScriptEngine.h
index 1a58b8672..4ba38b6ad 100644
--- a/extensions/script/python/PythonScriptEngine.h
+++ b/extensions/script/python/PythonScriptEngine.h
@@ -112,7 +112,7 @@ class PythonScriptEngine : public script::ScriptEngine {
   void callRequiredFunction(const std::string &fn_name, Args &&...args) {
     py::gil_scoped_acquire gil { };
     if (!(*bindings_).contains(fn_name.c_str()))
-      throw std::runtime_error("Required Function" + fn_name + " is not found within Python bindings");
+      throw std::runtime_error("Required Function " + fn_name + " is not found within Python bindings");
     (*bindings_)[fn_name.c_str()](convert(args)...);
   }