You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/12/05 12:49:49 UTC
[nifi-minifi-cpp] 02/03: MINIFICPP-1993 Fix empty return value of PyInputStream
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit d16f7455c746538f948db7762ce0af751ad4f4ed
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Dec 5 13:45:55 2022 +0100
MINIFICPP-1993 Fix empty return value of PyInputStream
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1463
---
docker/test/integration/features/python.feature | 10 +++++
.../minifi/processors/MoveContentToJson.py | 22 ++++++++++
.../pythonprocessors/examples/MoveContentToJson.py | 50 ++++++++++++++++++++++
extensions/script/python/PyInputStream.cpp | 2 +-
extensions/script/python/PythonScriptEngine.h | 2 +-
5 files changed, 84 insertions(+), 2 deletions(-)
diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature
index 2877853ea..5f1c72a34 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -36,3 +36,13 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up
Then the Minifi logs contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds
+
+ Scenario: Native python processor can read empty input stream
+ Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ And a MoveContentToJson processor
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the GenerateFlowFile processor is connected to the MoveContentToJson
+ And the "success" relationship of the MoveContentToJson processor is connected to the PutFile
+
+ When all instances start up
+ Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/processors/MoveContentToJson.py b/docker/test/integration/minifi/processors/MoveContentToJson.py
new file mode 100644
index 000000000..4d4013ce4
--- /dev/null
+++ b/docker/test/integration/minifi/processors/MoveContentToJson.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class MoveContentToJson(Processor):
+ def __init__(self):
+ super(MoveContentToJson, self).__init__('MoveContentToJson', class_prefix='org.apache.nifi.minifi.processors.examples.')
diff --git a/extensions/pythonprocessors/examples/MoveContentToJson.py b/extensions/pythonprocessors/examples/MoveContentToJson.py
new file mode 100644
index 000000000..1b3068b07
--- /dev/null
+++ b/extensions/pythonprocessors/examples/MoveContentToJson.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import codecs
+import json
+
+
+class ReadCallback:
+ def process(self, input_stream):
+ self.content = codecs.getreader('utf-8')(input_stream).read()
+ return len(self.content)
+
+
+class WriteToJsonCallback:
+ def __init__(self, content):
+ self.content = content
+
+ def process(self, output_stream):
+ json_content = json.dumps({"content": self.content})
+ output_stream.write(json_content.encode('utf-8'))
+ return len(json_content)
+
+
+def describe(processor):
+ processor.setDescription("Moves content of flow file to JSON file under 'content' key")
+
+
+def onInitialize(processor):
+ processor.setSupportsDynamicProperties()
+
+
+def onTrigger(context, session):
+ flow_file = session.get()
+ if flow_file is not None:
+ read_callback = ReadCallback()
+ session.read(flow_file, read_callback)
+ session.write(flow_file, WriteToJsonCallback(read_callback.content))
+ session.transfer(flow_file, REL_SUCCESS)
diff --git a/extensions/script/python/PyInputStream.cpp b/extensions/script/python/PyInputStream.cpp
index 1bc74ceb3..9e4951f9c 100644
--- a/extensions/script/python/PyInputStream.cpp
+++ b/extensions/script/python/PyInputStream.cpp
@@ -41,7 +41,7 @@ py::bytes PyInputStream::read(size_t len) {
}
if (len <= 0) {
- return nullptr;
+ return {};
}
std::vector<std::byte> buffer(len);
diff --git a/extensions/script/python/PythonScriptEngine.h b/extensions/script/python/PythonScriptEngine.h
index 1a58b8672..4ba38b6ad 100644
--- a/extensions/script/python/PythonScriptEngine.h
+++ b/extensions/script/python/PythonScriptEngine.h
@@ -112,7 +112,7 @@ class PythonScriptEngine : public script::ScriptEngine {
void callRequiredFunction(const std::string &fn_name, Args &&...args) {
py::gil_scoped_acquire gil { };
if (!(*bindings_).contains(fn_name.c_str()))
- throw std::runtime_error("Required Function" + fn_name + " is not found within Python bindings");
+ throw std::runtime_error("Required Function " + fn_name + " is not found within Python bindings");
(*bindings_)[fn_name.c_str()](convert(args)...);
}